Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1521,3 +1521,9 @@ with Parquet files performed by the Iceberg connector.
for structural data types. The equivalent catalog session property is
``parquet_optimized_nested_reader_enabled``.
- ``true``
* - ``parquet.use-bloom-filter``
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
- Whether bloom filters are used for predicate pushdown when reading
Parquet files. Set this property to ``false`` to disable the usage of
bloom filters by default. The equivalent catalog session property is
``parquet_use_bloom_filter``.
- ``true``
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.BasicSliceInput;
import io.airlift.slice.Slice;
Comment thread
groupcache4321 marked this conversation as resolved.
Outdated
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.format.BloomFilterHeader;
Expand All @@ -32,6 +35,7 @@
import java.util.Set;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
Expand Down Expand Up @@ -92,6 +96,30 @@ public Optional<BloomFilter> getBloomFilter(ColumnPath columnPath)
}
}

public static Optional<BloomFilterStore> getBloomFilterStore(
ParquetDataSource dataSource,
BlockMetaData blockMetadata,
TupleDomain<ColumnDescriptor> parquetTupleDomain,
ParquetReaderOptions options)
{
if (!options.useBloomFilter() || parquetTupleDomain.isAll() || parquetTupleDomain.isNone()) {
return Optional.empty();
}

boolean hasBloomFilter = blockMetadata.getColumns().stream().anyMatch(BloomFilterStore::hasBloomFilter);
if (!hasBloomFilter) {
return Optional.empty();
}

Map<ColumnDescriptor, Domain> parquetDomains = parquetTupleDomain.getDomains()
.orElseThrow(() -> new IllegalStateException("Predicate other than none should have domains"));
Set<ColumnPath> columnsFilteredPaths = parquetDomains.keySet().stream()
.map(column -> ColumnPath.get(column.getPath()))
.collect(toImmutableSet());

return Optional.of(new BloomFilterStore(dataSource, blockMetadata, columnsFilteredPaths));
}

public static boolean hasBloomFilter(ColumnChunkMetaData columnMetaData)
{
return columnMetaData.getBloomFilterOffset() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.parquet.BloomFilterStore.getBloomFilterStore;
import static io.trino.parquet.ParquetTypeUtils.constructField;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
import static io.trino.parquet.ParquetTypeUtils.getDescriptors;
Expand Down Expand Up @@ -394,30 +395,6 @@ public static Optional<ColumnIndexStore> getColumnIndexStore(
return Optional.of(new TrinoColumnIndexStore(dataSource, blockMetadata, columnsReadPaths, columnsFilteredPaths));
}

public static Optional<BloomFilterStore> getBloomFilterStore(
ParquetDataSource dataSource,
BlockMetaData blockMetadata,
TupleDomain<ColumnDescriptor> parquetTupleDomain,
ParquetReaderOptions options)
{
if (!options.useBloomFilter() || parquetTupleDomain.isAll() || parquetTupleDomain.isNone()) {
return Optional.empty();
}

boolean hasBloomFilter = blockMetadata.getColumns().stream().anyMatch(BloomFilterStore::hasBloomFilter);
if (!hasBloomFilter) {
return Optional.empty();
}

Map<ColumnDescriptor, Domain> parquetDomains = parquetTupleDomain.getDomains()
.orElseThrow(() -> new IllegalStateException("Predicate other than none should have domains"));
Set<ColumnPath> columnsFilteredPaths = parquetDomains.keySet().stream()
.map(column -> ColumnPath.get(column.getPath()))
.collect(toImmutableSet());

return Optional.of(new BloomFilterStore(dataSource, blockMetadata, columnsFilteredPaths));
}

public static TupleDomain<ColumnDescriptor> getParquetTupleDomain(
Map<List<String>, ColumnDescriptor> descriptorsByPath,
TupleDomain<HiveColumnHandle> effectivePredicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,22 @@
package io.trino.plugin.hive.parquet;

import com.google.common.collect.ImmutableList;
import io.trino.Session;
import io.trino.plugin.hive.HiveQueryRunner;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.BaseTestParquetWithBloomFilters;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.mapred.JobConf;
import org.joda.time.DateTimeZone;
import org.testng.annotations.Test;

import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
Expand All @@ -42,96 +40,36 @@
import static org.apache.parquet.format.CompressionCodec.SNAPPY;
import static org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED;
import static org.apache.parquet.hadoop.ParquetOutputFormat.WRITER_VERSION;
import static org.assertj.core.api.Assertions.assertThat;

public class TestHiveParquetWithBloomFilters
extends AbstractTestQueryFramework
extends BaseTestParquetWithBloomFilters
{
private static final String COLUMN_NAME = "dataColumn";
// containing extreme values, so the row group cannot be eliminated by the column chunk's min/max statistics
private static final List<Integer> TEST_VALUES = Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE, 1, 3, 7, 10, 15);
private static final int MISSING_VALUE = 0;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return HiveQueryRunner.builder().build();
DistributedQueryRunner queryRunner = HiveQueryRunner.builder().build();
dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data");
return queryRunner;
}

@Test
public void verifyBloomFilterEnabled()
{
assertThat(query(format("SHOW SESSION LIKE '%s.parquet_use_bloom_filter'", getSession().getCatalog().orElseThrow())))
.skippingTypesCheck()
.matches(result -> result.getRowCount() == 1)
.matches(result -> {
String value = (String) result.getMaterializedRows().get(0).getField(1);
return value.equals("true");
});
}

@Test
public void testBloomFilterRowGroupPruning()
throws Exception
@Override
protected CatalogSchemaTableName createParquetTableWithBloomFilter(String columnName, List<Integer> testValues)
{
File tmpDir = Files.createTempDirectory("testBloomFilterRowGroupPruning").toFile();
try {
File parquetFile = new File(tmpDir, randomNameSuffix());

String tableName = "parquet_with_bloom_filters_" + randomNameSuffix();
createParquetBloomFilterSource(parquetFile, COLUMN_NAME, TEST_VALUES);
assertUpdate(
format(
"CREATE TABLE %s (%s INT) WITH (format = 'PARQUET', external_location = '%s')",
tableName,
COLUMN_NAME,
tmpDir.getAbsolutePath()));

// When reading bloom filter is enabled, row groups are pruned when searching for a missing value
assertQueryStats(
getSession(),
"SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + MISSING_VALUE,
queryStats -> {
assertThat(queryStats.getPhysicalInputPositions()).isEqualTo(0);
assertThat(queryStats.getProcessedInputPositions()).isEqualTo(0);
},
results -> assertThat(results.getRowCount()).isEqualTo(0));
// create the managed table
String tableName = "parquet_with_bloom_filters_" + randomNameSuffix();
CatalogSchemaTableName catalogSchemaTableName = new CatalogSchemaTableName("hive", new SchemaTableName("tpch", tableName));
assertUpdate(format("CREATE TABLE %s (%s INT) WITH (format = 'PARQUET')", catalogSchemaTableName, columnName));

// When reading bloom filter is enabled, row groups are not pruned when searching for a value present in the file
assertQueryStats(
getSession(),
"SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + TEST_VALUES.get(0),
queryStats -> {
assertThat(queryStats.getPhysicalInputPositions()).isGreaterThan(0);
assertThat(queryStats.getProcessedInputPositions()).isEqualTo(queryStats.getPhysicalInputPositions());
},
results -> assertThat(results.getRowCount()).isEqualTo(1));
// directly write data to the managed table
Path tableLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableName));
Path fileLocation = tableLocation.resolve("bloomFilterFile.parquet");
writeParquetFileWithBloomFilter(fileLocation.toFile(), columnName, testValues);

// When reading bloom filter is disabled, row groups are not pruned when searching for a missing value
assertQueryStats(
bloomFiltersDisabled(getSession()),
"SELECT * FROM " + tableName + " WHERE " + COLUMN_NAME + " = " + MISSING_VALUE,
queryStats -> {
assertThat(queryStats.getPhysicalInputPositions()).isGreaterThan(0);
assertThat(queryStats.getProcessedInputPositions()).isEqualTo(queryStats.getPhysicalInputPositions());
},
results -> assertThat(results.getRowCount()).isEqualTo(0));
}
finally {
deleteRecursively(tmpDir.toPath(), ALLOW_INSECURE);
}
return catalogSchemaTableName;
}

private static Session bloomFiltersDisabled(Session session)
{
return Session.builder(session)
.setCatalogSessionProperty(session.getCatalog().orElseThrow(), "parquet_use_bloom_filter", "false")
.build();
}

private static void createParquetBloomFilterSource(File tempFile, String columnName, List<Integer> testValues)
throws Exception
public static void writeParquetFileWithBloomFilter(File tempFile, String columnName, List<Integer> testValues)
{
List<ObjectInspector> objectInspectors = singletonList(javaIntObjectInspector);
List<String> columnNames = ImmutableList.of(columnName);
Expand All @@ -140,15 +78,20 @@ private static void createParquetBloomFilterSource(File tempFile, String columnN
jobConf.setEnum(WRITER_VERSION, PARQUET_1_0);
jobConf.setBoolean(BLOOM_FILTER_ENABLED, true);

ParquetTester.writeParquetColumn(
jobConf,
tempFile,
SNAPPY,
ParquetTester.createTableProperties(columnNames, objectInspectors),
getStandardStructObjectInspector(columnNames, objectInspectors),
new Iterator<?>[] {testValues.iterator()},
Optional.empty(),
false,
DateTimeZone.getDefault());
try {
ParquetTester.writeParquetColumn(
jobConf,
tempFile,
SNAPPY,
ParquetTester.createTableProperties(columnNames, objectInspectors),
getStandardStructObjectInspector(columnNames, objectInspectors),
new Iterator<?>[] {testValues.iterator()},
Optional.empty(),
false,
DateTimeZone.getDefault());
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.trino.orc.TupleDomainOrcPredicate;
import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder;
import io.trino.orc.metadata.OrcType;
import io.trino.parquet.BloomFilterStore;
import io.trino.parquet.Field;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSource;
Expand Down Expand Up @@ -131,6 +132,7 @@
import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
import static io.trino.orc.OrcReader.ProjectedLayout;
import static io.trino.orc.OrcReader.fullyProjectedLayout;
import static io.trino.parquet.BloomFilterStore.getBloomFilterStore;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
import static io.trino.parquet.ParquetTypeUtils.getDescriptors;
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
Expand All @@ -156,6 +158,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isParquetOptimizedNestedReaderEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isParquetOptimizedReaderEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata;
import static io.trino.plugin.iceberg.IcebergSessionProperties.useParquetBloomFilter;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
Expand Down Expand Up @@ -510,6 +513,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource(
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
.withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session))
.withBatchColumnReaders(isParquetOptimizedReaderEnabled(session))
.withBloomFilter(useParquetBloomFilter(session))
.withBatchNestedColumnReaders(isParquetOptimizedNestedReaderEnabled(session)),
predicate,
fileFormatDataSourceStats,
Expand Down Expand Up @@ -927,8 +931,10 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource(
List<BlockMetaData> blocks = new ArrayList<>();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
Optional<BloomFilterStore> bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options);

if (start <= firstDataPage && firstDataPage < start + length &&
predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty(), Optional.empty(), UTC, ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) {
predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty(), bloomFilterStore, UTC, ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) {
blocks.add(block);
blockStarts.add(nextStart);
if (startRowPosition.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class IcebergSessionProperties
private static final String ORC_WRITER_MAX_STRIPE_ROWS = "orc_writer_max_stripe_rows";
private static final String ORC_WRITER_MAX_DICTIONARY_MEMORY = "orc_writer_max_dictionary_memory";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_USE_BLOOM_FILTER = "parquet_use_bloom_filter";
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled";
private static final String PARQUET_OPTIMIZED_NESTED_READER_ENABLED = "parquet_optimized_nested_reader_enabled";
Expand Down Expand Up @@ -197,6 +198,11 @@ public IcebergSessionProperties(
"Parquet: Maximum size of a block to read",
parquetReaderConfig.getMaxReadBlockSize(),
false))
.add(booleanProperty(
PARQUET_USE_BLOOM_FILTER,
"Parquet: Enable using Parquet bloom filter",
parquetReaderConfig.isUseBloomFilter(),
false))
.add(integerProperty(
PARQUET_MAX_READ_BLOCK_ROW_COUNT,
"Parquet: Maximum number of rows read in a batch",
Expand Down Expand Up @@ -432,6 +438,11 @@ public static int getParquetWriterBatchSize(ConnectorSession session)
return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class);
}

public static boolean useParquetBloomFilter(ConnectorSession session)
{
return session.getProperty(PARQUET_USE_BLOOM_FILTER, Boolean.class);
}

public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
Expand Down
Loading