Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -582,6 +583,12 @@ public <D> CloseableIterable<D> build() {
optionsBuilder = ParquetReadOptions.builder();
}

if (filter != null && !filter.equals(Expressions.alwaysTrue()) &&
ParquetFilters.isSupportedFilter(filter)) {
optionsBuilder.useRecordFilter(filterRecords);
optionsBuilder.withRecordFilter(ParquetFilters.convert(getSchemaFromFile(), filter, caseSensitive));
}

for (Map.Entry<String, String> entry : properties.entrySet()) {
optionsBuilder.set(entry.getKey(), entry.getValue());
}
Expand Down Expand Up @@ -623,17 +630,10 @@ public <D> CloseableIterable<D> build() {
if (filter != null) {
// TODO: should not need to get the schema to push down before opening the file.
// Parquet should allow setting a filter inside its read support
MessageType type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above sounds like another possible reason why we wanted to reimplement filters. Tailoring the filter to each file is difficult, compared to evaluating the same filter for a file.

If I remember correctly, the need to tailor the filter for the file is because we use id-based column resolution. So the file might contain 1: a int and Iceberg has a filter for 1: x long. Unless the filter is translated to use a instead of x, Parquet will skip the file because it doesn't think the column exists (and is all nulls).

try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
Schema fileSchema = ParquetSchemaUtil.convert(type);
builder.useStatsFilter()
.useDictionaryFilter()
.useRecordFilter(filterRecords)
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
.withFilter(ParquetFilters.convert(getSchemaFromFile(), filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
Expand All @@ -655,6 +655,16 @@ public <D> CloseableIterable<D> build() {

return new ParquetIterable<>(builder);
}

private Schema getSchemaFromFile() {
MessageType type;
try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
return ParquetSchemaUtil.convert(type);
}
}

private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.iceberg.parquet;

import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.And;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -29,7 +31,9 @@
import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.Not;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
Expand All @@ -38,9 +42,37 @@

class ParquetFilters {

private static final Set<Operation> SUPPORTED_OPS = ImmutableSet.of(
Operation.IS_NULL,
Operation.NOT_NULL,
Operation.EQ,
Operation.NOT_EQ,
Operation.GT,
Operation.GT_EQ,
Operation.LT,
Operation.LT_EQ);

private ParquetFilters() {
}

public static boolean isSupportedFilter(Expression expr) {
if (expr.op().equals(Operation.AND)) {
return isSupportedFilter(((And) expr).left()) &&
isSupportedFilter(((And) expr).right());
} else if (expr.op().equals(Operation.OR)) {
return isSupportedFilter(((And) expr).left()) &&
isSupportedFilter(((And) expr).right());
} else if (expr.op().equals(Operation.NOT)) {
return isSupportedFilter(((Not) expr).child());
} else {
return isSupportedOp(expr);
}
}

private static boolean isSupportedOp(Expression expr) {
return SUPPORTED_OPS.contains(expr.op());
}

static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) {
FilterPredicate pred = ExpressionVisitors.visit(expr, new ConvertFilterToParquet(schema, caseSensitive));
// TODO: handle AlwaysFalse.INSTANCE
Expand Down Expand Up @@ -231,7 +263,7 @@ private static class AlwaysFalse implements FilterPredicate {

@Override
public <R> R accept(Visitor<R> visitor) {
throw new UnsupportedOperationException("AlwaysTrue is a placeholder only");
throw new UnsupportedOperationException("AlwaysFalse is a placeholder only");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ public CloseableIterator<T> iterator() {

private static class FileIterator<T> implements CloseableIterator<T> {
private final ParquetFileReader reader;
private final boolean[] shouldSkip;
private final ParquetValueReader<T> model;
private final long totalValues;
private final boolean reuseContainers;
private final long[] rowGroupsStartRowPos;
private final boolean hasRecordFilter;

private int nextRowGroup = 0;
private long nextRowGroupStart = 0;
Expand All @@ -94,11 +94,11 @@ private static class FileIterator<T> implements CloseableIterator<T> {

FileIterator(ReadConf<T> conf) {
this.reader = conf.reader();
this.shouldSkip = conf.shouldSkip();
this.model = conf.model();
this.totalValues = conf.totalValues();
this.reuseContainers = conf.reuseContainers();
this.rowGroupsStartRowPos = conf.startRowPositions();
this.hasRecordFilter = conf.hasRecordFilter();
}

@Override
Expand All @@ -123,14 +123,14 @@ public T next() {
}

private void advance() {
while (shouldSkip[nextRowGroup]) {
nextRowGroup += 1;
reader.skipNextRowGroup();
}

PageReadStore pages;
try {
pages = reader.readNextRowGroup();
// Because of the issue of PARQUET-1901, we cannot blindly call readNextFilteredRowGroup()
if (hasRecordFilter) {
pages = reader.readNextFilteredRowGroup();
} else {
pages = reader.readNextRowGroup();
}
} catch (IOException e) {
throw new RuntimeIOException(e);
}
Expand Down
54 changes: 21 additions & 33 deletions parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class ReadConf<T> {
private final ParquetValueReader<T> model;
private final VectorizedReader<T> vectorizedModel;
private final List<BlockMetaData> rowGroups;
private final boolean[] shouldSkip;
private final long totalValues;
private final boolean reuseContainers;
private final Integer batchSize;
Expand Down Expand Up @@ -85,34 +84,28 @@ class ReadConf<T> {
this.projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
}

// ParquetFileReader has filters(stats, dictionary and future bloomfilter) in the constructor,
// so getRowGroups returns filtered row groups
this.rowGroups = reader.getRowGroups();
this.shouldSkip = new boolean[rowGroups.size()];

// Fetch all row groups starting positions to compute the row offsets of the filtered row groups
Map<Long, Long> offsetToStartPos = generateOffsetToStartPos();
this.startRowPositions = new long[rowGroups.size()];

ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
if (filter != null) {
statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR removes support for pushing down filters like startsWith and in?


long computedTotalValues = 0L;
for (int i = 0; i < shouldSkip.length; i += 1) {
// If a row group has 0 counts after filtering, it won't be added to rowGroups
for (int i = 0; i < rowGroups.size(); i += 1) {
BlockMetaData rowGroup = rowGroups.get(i);
startRowPositions[i] = offsetToStartPos.get(rowGroup.getStartingPos());
boolean shouldRead = filter == null || (
statsFilter.shouldRead(typeWithIds, rowGroup) &&
dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
this.shouldSkip[i] = !shouldRead;
if (shouldRead) {
computedTotalValues += rowGroup.getRowCount();
}
computedTotalValues += rowGroup.getRowCount();
}

this.totalValues = computedTotalValues;
// Because of the issue of PARQUET-1901, we cannot blindly call getFilteredRecordCount()
if (filter != null) {
this.totalValues = reader.getFilteredRecordCount();
} else {
this.totalValues = computedTotalValues;
}
if (readerFunc != null) {
this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
this.vectorizedModel = null;
Expand All @@ -134,7 +127,6 @@ private ReadConf(ReadConf<T> toCopy) {
this.projection = toCopy.projection;
this.model = toCopy.model;
this.rowGroups = toCopy.rowGroups;
this.shouldSkip = toCopy.shouldSkip;
this.totalValues = toCopy.totalValues;
this.reuseContainers = toCopy.reuseContainers;
this.batchSize = toCopy.batchSize;
Expand Down Expand Up @@ -162,10 +154,6 @@ VectorizedReader<T> vectorizedModel() {
return vectorizedModel;
}

boolean[] shouldSkip() {
return shouldSkip;
}

private Map<Long, Long> generateOffsetToStartPos() {
try (ParquetFileReader fileReader = newReader(file, ParquetReadOptions.builder().build())) {
Map<Long, Long> offsetToStartPos = new HashMap<>();
Expand Down Expand Up @@ -208,6 +196,10 @@ ReadConf<T> copy() {
return new ReadConf<>(this);
}

boolean hasRecordFilter() {
return options.getRecordFilter() != null;
}

private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
try {
return ParquetFileReader.open(ParquetIO.file(file), options);
Expand All @@ -221,16 +213,12 @@ private List<Map<ColumnPath, ColumnChunkMetaData>> getColumnChunkMetadataForRowG
.map(columnDescriptor -> ColumnPath.get(columnDescriptor.getPath())).collect(Collectors.toSet());
ImmutableList.Builder<Map<ColumnPath, ColumnChunkMetaData>> listBuilder = ImmutableList.builder();
for (int i = 0; i < rowGroups.size(); i++) {
if (!shouldSkip[i]) {
BlockMetaData blockMetaData = rowGroups.get(i);
ImmutableMap.Builder<ColumnPath, ColumnChunkMetaData> mapBuilder = ImmutableMap.builder();
blockMetaData.getColumns().stream()
.filter(columnChunkMetaData -> projectedColumns.contains(columnChunkMetaData.getPath()))
.forEach(columnChunkMetaData -> mapBuilder.put(columnChunkMetaData.getPath(), columnChunkMetaData));
listBuilder.add(mapBuilder.build());
} else {
listBuilder.add(ImmutableMap.of());
}
BlockMetaData blockMetaData = rowGroups.get(i);
ImmutableMap.Builder<ColumnPath, ColumnChunkMetaData> mapBuilder = ImmutableMap.builder();
blockMetaData.getColumns().stream()
.filter(columnChunkMetaData -> projectedColumns.contains(columnChunkMetaData.getPath()))
.forEach(columnChunkMetaData -> mapBuilder.put(columnChunkMetaData.getPath(), columnChunkMetaData));
listBuilder.add(mapBuilder.build());
}
return listBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,28 @@ public CloseableIterator<T> iterator() {

private static class FileIterator<T> implements CloseableIterator<T> {
private final ParquetFileReader reader;
private final boolean[] shouldSkip;
private final VectorizedReader<T> model;
private final long totalValues;
private final int batchSize;
private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
private final boolean reuseContainers;
private final boolean hasRecordFilter;
private int nextRowGroup = 0;
private long nextRowGroupStart = 0;
private long valuesRead = 0;
private T last = null;

FileIterator(ReadConf conf) {
this.reader = conf.reader();
this.shouldSkip = conf.shouldSkip();
this.totalValues = conf.totalValues();
this.reuseContainers = conf.reuseContainers();
this.model = conf.vectorizedModel();
this.batchSize = conf.batchSize();
this.model.setBatchSize(this.batchSize);
this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
this.hasRecordFilter = conf.hasRecordFilter();
}


@Override
public boolean hasNext() {
return valuesRead < totalValues;
Expand Down Expand Up @@ -139,16 +138,18 @@ public T next() {
}

private void advance() {
while (shouldSkip[nextRowGroup]) {
nextRowGroup += 1;
reader.skipNextRowGroup();
}
PageReadStore pages;
try {
pages = reader.readNextRowGroup();
// Because of the issue of PARQUET-1901, we cannot blindly call readNextFilteredRowGroup()
if (hasRecordFilter) {
pages = reader.readNextFilteredRowGroup();
} else {
pages = reader.readNextRowGroup();
}
} catch (IOException e) {
throw new RuntimeIOException(e);
}

model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;
Expand Down
Loading