Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.parquet.filter2.bloomfilterlevel.BloomFilterImpl;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
Expand All @@ -34,8 +35,6 @@
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Given a {@link Filter} applies it to a list of BlockMetaData (row groups)
Expand Down Expand Up @@ -103,11 +102,13 @@ public List<BlockMetaData> visit(FilterCompat.FilterPredicateCompat filterPredic
drop = StatisticsFilter.canDrop(filterPredicate, block.getColumns());
}

if(!drop && levels.contains(FilterLevel.DICTIONARY)) {
drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block));
// used to mark whether the column has used a dictionary, if dictionary has been used, skip `BloomFilter` to save time.
AtomicBoolean hasDictionaryUsed = new AtomicBoolean(false);
if (!drop && levels.contains(FilterLevel.DICTIONARY)) {
drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block), hasDictionaryUsed);
}

if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) {
if (!drop && !hasDictionaryUsed.get() && levels.contains(FilterLevel.BLOOMFILTER)) {
drop = BloomFilterImpl.canDrop(filterPredicate, block.getColumns(), reader.getBloomFilterDataReader(block));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;

/**
Expand All @@ -52,20 +53,27 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
private static final boolean BLOCK_MIGHT_MATCH = false;
private static final boolean BLOCK_CANNOT_MATCH = true;

public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns, DictionaryPageReadStore dictionaries) {
public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns, DictionaryPageReadStore dictionaries,
AtomicBoolean hasDictionaryUsed) {
Objects.requireNonNull(pred, "pred cannnot be null");
Objects.requireNonNull(columns, "columns cannnot be null");
return pred.accept(new DictionaryFilter(columns, dictionaries));
return pred.accept(new DictionaryFilter(columns, dictionaries, hasDictionaryUsed));
}

public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns, DictionaryPageReadStore dictionaries) {
return canDrop(pred, columns, dictionaries, new AtomicBoolean(false));
}

private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
private final DictionaryPageReadStore dictionaries;
private AtomicBoolean hasDictionaryUsed;

private DictionaryFilter(List<ColumnChunkMetaData> columnsList, DictionaryPageReadStore dictionaries) {
private DictionaryFilter(List<ColumnChunkMetaData> columnsList, DictionaryPageReadStore dictionaries,
AtomicBoolean hasDictionaryUsed) {
for (ColumnChunkMetaData chunk : columnsList) {
columns.put(chunk.getPath(), chunk);
}

this.hasDictionaryUsed = hasDictionaryUsed;
this.dictionaries = dictionaries;
}

Expand Down Expand Up @@ -113,7 +121,9 @@ private <T extends Comparable<T>> Set<T> expandDictionary(ColumnChunkMetaData me
for (int i = 0; i <= dict.getMaxId(); i++) {
dictSet.add((T) dictValueProvider.apply(i));
}

if (!hasNonDictionaryPages(meta)) {
hasDictionaryUsed.set(true);
}
return dictSet;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
Expand Down Expand Up @@ -333,8 +334,16 @@ public void testEqFixed() throws Exception {
canDrop(eq(b, toBinary("-2", 17)), ccmd, dictionaries));
}

AtomicBoolean hasDictionaryUsed = new AtomicBoolean(false);
assertFalse("Should not drop block for -1",
canDrop(eq(b, toBinary("-1", 17)), ccmd, dictionaries));
canDrop(eq(b, toBinary("-1", 17)), ccmd, dictionaries, hasDictionaryUsed));

// Only V2 supports dictionary encoding for FIXED_LEN_BYTE_ARRAY values
if (version == PARQUET_2_0) {
assertTrue(hasDictionaryUsed.get());
} else {
assertFalse(hasDictionaryUsed.get());
}

assertFalse("Should not drop block for null",
canDrop(eq(b, null), ccmd, dictionaries));
Expand Down