Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -113,10 +114,14 @@
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Parquet {
private Parquet() {}

private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);

private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
Sets.newHashSet(
"parquet.read.filter",
Expand Down Expand Up @@ -1160,6 +1165,14 @@ public <D> CloseableIterable<D> build() {
optionsBuilder.withDecryption(fileDecryptionProperties);
}

if (filter != null
&& !filter.equals(Expressions.alwaysTrue())
&& ParquetFilters.isSupportedFilter(filter)) {
optionsBuilder.useRecordFilter(filterRecords);
optionsBuilder.withRecordFilter(
ParquetFilters.convert(getSchemaFromFile(), filter, caseSensitive));
}

ParquetReadOptions options = optionsBuilder.build();

if (batchedReaderFunc != null) {
Expand Down Expand Up @@ -1254,6 +1267,16 @@ public <D> CloseableIterable<D> build() {

return new ParquetIterable<>(builder);
}

private Schema getSchemaFromFile() {
MessageType type;
try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
return ParquetSchemaUtil.convert(type);
}
}

private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.iceberg.parquet;

import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.And;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -28,7 +30,9 @@
import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.Not;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
Expand All @@ -37,6 +41,17 @@

class ParquetFilters {

private static final Set<Operation> SUPPORTED_OPS =
ImmutableSet.of(
Operation.IS_NULL,
Operation.NOT_NULL,
Operation.EQ,
Operation.NOT_EQ,
Operation.GT,
Operation.GT_EQ,
Operation.LT,
Operation.LT_EQ);

private ParquetFilters() {}

static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) {
Expand Down Expand Up @@ -173,6 +188,22 @@ public <T> FilterPredicate predicate(UnboundPredicate<T> pred) {
}
}

public static boolean isSupportedFilter(Expression expr) {
if (expr.op().equals(Operation.AND)) {
return isSupportedFilter(((And) expr).left()) && isSupportedFilter(((And) expr).right());
} else if (expr.op().equals(Operation.OR)) {
return isSupportedFilter(((And) expr).left()) && isSupportedFilter(((And) expr).right());
} else if (expr.op().equals(Operation.NOT)) {
return isSupportedFilter(((Not) expr).child());
} else {
return isSupportedOp(expr);
}
}

private static boolean isSupportedOp(Expression expr) {
return SUPPORTED_OPS.contains(expr.op());
}

@SuppressWarnings("checkstyle:MethodTypeParameterName")
private static <C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt>
FilterPredicate pred(Operation op, COL col, C value) {
Expand Down
66 changes: 42 additions & 24 deletions parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Configuration for Parquet readers.
Expand All @@ -61,6 +63,7 @@ class ReadConf<T> {
private final Integer batchSize;
private final long[] startRowPositions;

private static final Logger LOG = LoggerFactory.getLogger(ReadConf.class);
// List of column chunk metadata for each row group
private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;

Expand Down Expand Up @@ -100,34 +103,45 @@ class ReadConf<T> {
// Fetch all row groups starting positions to compute the row offsets of the filtered row groups
Map<Long, Long> offsetToStartPos = generateOffsetToStartPos(expectedSchema);

ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
ParquetBloomRowGroupFilter bloomFilter = null;
if (filter != null) {
statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
bloomFilter = new ParquetBloomRowGroupFilter(expectedSchema, filter, caseSensitive);
}

long computedTotalValues = 0L;
for (int i = 0; i < shouldSkip.length; i += 1) {
BlockMetaData rowGroup = rowGroups.get(i);
startRowPositions[i] =
offsetToStartPos == null ? 0 : offsetToStartPos.get(rowGroup.getStartingPos());
boolean shouldRead =
filter == null
|| (statsFilter.shouldRead(typeWithIds, rowGroup)
&& dictFilter.shouldRead(
typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup))
&& bloomFilter.shouldRead(
typeWithIds, rowGroup, reader.getBloomFilterDataReader(rowGroup)));
this.shouldSkip[i] = !shouldRead;
if (shouldRead) {
computedTotalValues += rowGroup.getRowCount();
LOG.info("Filters pushed : {}", options.getRecordFilter());

if (options.getRecordFilter() == null) {
LOG.warn("Filters not pushed yet, checking on row groups");

ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
ParquetBloomRowGroupFilter bloomFilter = null;
if (filter != null) {
statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
bloomFilter = new ParquetBloomRowGroupFilter(expectedSchema, filter, caseSensitive);
}

for (int i = 0; i < shouldSkip.length; i += 1) {
BlockMetaData rowGroup = rowGroups.get(i);
startRowPositions[i] =
offsetToStartPos == null ? 0 : offsetToStartPos.get(rowGroup.getStartingPos());
boolean shouldRead =
filter == null
|| (statsFilter.shouldRead(typeWithIds, rowGroup)
&& dictFilter.shouldRead(
typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup))
&& bloomFilter.shouldRead(
typeWithIds, rowGroup, reader.getBloomFilterDataReader(rowGroup)));
this.shouldSkip[i] = !shouldRead;
if (shouldRead) {
computedTotalValues += rowGroup.getRowCount();
}
}
}

this.totalValues = computedTotalValues;
if (filter != null) {
this.totalValues = reader.getFilteredRecordCount();
} else {
this.totalValues = computedTotalValues;
}

if (readerFunc != null) {
this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
this.vectorizedModel = null;
Expand Down Expand Up @@ -158,6 +172,10 @@ private ReadConf(ReadConf<T> toCopy) {
this.startRowPositions = toCopy.startRowPositions;
}

boolean hasRecordFilter() {
return options.getRecordFilter() != null;
}

ParquetFileReader reader() {
if (reader != null) {
reader.setRequestedSchema(projection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,19 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private long valuesRead = 0;
private T last = null;
private final long[] rowGroupsStartRowPos;
private final boolean hasRecordFilter;

FileIterator(ReadConf conf) {
this.reader = conf.reader();
this.shouldSkip = conf.shouldSkip();
this.totalValues = conf.totalValues();
this.totalValues = reader.getFilteredRecordCount();
this.reuseContainers = conf.reuseContainers();
this.model = conf.vectorizedModel();
this.batchSize = conf.batchSize();
this.model.setBatchSize(this.batchSize);
this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
this.rowGroupsStartRowPos = conf.startRowPositions();
this.hasRecordFilter = conf.hasRecordFilter();
}

@Override
Expand Down Expand Up @@ -160,7 +162,11 @@ private void advance() {
}
PageReadStore pages;
try {
pages = reader.readNextRowGroup();
if (hasRecordFilter) {
pages = reader.readNextFilteredRowGroup();
} else {
pages = reader.readNextRowGroup();
}
} catch (IOException e) {
throw new RuntimeIOException(e);
}
Expand Down