Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg.data;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -36,11 +38,15 @@
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetBloomRowGroupFilter;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand All @@ -49,10 +55,15 @@
import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
import org.apache.parquet.hadoop.BloomFilterReader;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;

public abstract class DeleteFilter<T> {
private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L;
Expand Down Expand Up @@ -133,10 +144,19 @@ public CloseableIterable<T> filter(CloseableIterable<T> records) {

private List<Predicate<T>> applyEqDeletes() {
List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
Map<BlockMetaData, BloomFilterReader> parquetBloomFilterReader = Maps.newHashMap();
ParquetFileReader parquetReader = null;
Predicate<Record> isInBloomFilter = null;
if (eqDeletes.isEmpty()) {
return isInDeleteSets;
}

// load bloomfilter readers from data file
if (filePath.endsWith(".parquet")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to check whether the bloom filter is turned on to avoid reading the footer if it is not?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean we check the bloom filter by table properties, right? but the bloom filter properties may be updated, the bloom filter in current file is unmatched with table properties.

parquetReader = ParquetUtil.openFile(getInputFile(filePath));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use try-with-resource here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's a big change , I want to keep this reader open during the iteration of delete files, and considering orc format, the delete iteration need to be encapsulated in a new function, any advise for this change?

parquetBloomFilterReader.putAll(ParquetUtil.getParquetBloomFilters(parquetReader));
}

Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
for (DeleteFile delete : eqDeletes) {
filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
Expand All @@ -148,6 +168,12 @@ private List<Predicate<T>> applyEqDeletes() {

Schema deleteSchema = TypeUtil.select(requiredSchema, ids);

if (filePath.endsWith(".parquet") && parquetReader != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change the ctor parameter from String to DataFile and then here we can check file.format().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right, but dataFile is not passed into DeleteFilter as parameter, it was changed to filePath in #4381

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, any concern if we change it to DataFile?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change for constructor parameters is only for Trino supporting mor,Trino wrapped a dummy fileScanTask for data file currently, the author wants to remove fileScanTask implemented in Trino, and use the filePath parameter. If we change it to dataFile, compatibility is a problem.

MessageType fileSchema = parquetReader.getFileMetaData().getSchema();
isInBloomFilter =
record -> findInParquetBloomFilter(record, deleteSchema, fileSchema, parquetBloomFilterReader);
}

// a projection to select and reorder fields of the file schema to match the delete rows
StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);

Expand All @@ -158,6 +184,11 @@ private List<Predicate<T>> applyEqDeletes() {
CloseableIterable<Record> records = CloseableIterable.transform(
CloseableIterable.concat(deleteRecords), Record::copy);

// apply bloomfilter on delete records
if (isInBloomFilter != null) {
records = CloseableIterable.filter(records, isInBloomFilter);
}

StructLikeSet deleteSet = Deletes.toEqualitySet(
CloseableIterable.transform(
records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)),
Expand All @@ -166,10 +197,67 @@ private List<Predicate<T>> applyEqDeletes() {
Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
isInDeleteSets.add(isInDeleteSet);
}

try {
if (parquetReader != null) {
parquetReader.close();
}
} catch (IOException e) {
throw new UncheckedIOException("failed to close parquet file reader!", e);
}
return isInDeleteSets;
}

private boolean findInParquetBloomFilter(
Record record,
Schema deleteSchema,
MessageType fileSchema,
Map<BlockMetaData, BloomFilterReader> parquetBloomFilterReader) {
if (record.size() == 0 || parquetBloomFilterReader.isEmpty()) {
return true;
}
// build filter by record values
Expression filter = buildFilter(record, deleteSchema);
ParquetBloomRowGroupFilter bloomFilter = new ParquetBloomRowGroupFilter(deleteSchema, filter, true);
for (Map.Entry<BlockMetaData, BloomFilterReader> entry : parquetBloomFilterReader.entrySet()) {
boolean shouldRead = bloomFilter.shouldRead(fileSchema, entry.getKey(), entry.getValue());
if (shouldRead) {
return true;
}
}

return false;
}

private Expression buildFilter(Record record, Schema schema) {
Expression filter = Expressions.alwaysTrue();
for (Types.NestedField field : schema.columns()) {
Object value = getRecordValue(record, field);
if (value == null) {
continue;
}
filter = Expressions.and(filter, Expressions.equal(field.name(), value));
}
return filter;
}

private Object getRecordValue(Record record, Types.NestedField field) {
Type type = field.type();
switch (type.toString()) {
case "date":
return Literal.of(record.getField(field.name()).toString()).to(Types.DateType.get()).value();
case "time":
return Literal.of(record.getField(field.name()).toString()).to(Types.TimeType.get()).value();
case "timestamp":
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
return Literal.of(record.getField(field.name()).toString()).to(Types.TimestampType.withZone()).value();
} else {
return Literal.of(record.getField(field.name()).toString()).to(Types.TimestampType.withoutZone()).value();
}
default:
return record.getField(field.name());
}
}

public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
// Predicate to test whether a row has been deleted by equality deletions.
Predicate<T> deletedRows = applyEqDeletes().stream()
Expand Down
45 changes: 44 additions & 1 deletion data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -84,7 +85,9 @@ public void writeTestDataFile() throws IOException {
this.tableName = "test";
this.table = createTable(tableName, SCHEMA, SPEC);
this.records = Lists.newArrayList();

table.updateProperties()
.set(TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "data", "true")
.commit();
// records all use IDs that are in bucket id_bucket=0
GenericRecord record = GenericRecord.create(table.schema());
records.add(record.copy("id", 29, "data", "a"));
Expand Down Expand Up @@ -435,6 +438,46 @@ public void testEqualityDeleteByNull() throws IOException {
Assert.assertEquals("Table should contain expected rows", expected, actual);
}

@Test
public void testEqualityDeleteWithBloomFilter() throws IOException {
Schema deleteRowSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteRowSchema);
List<Record> dataDeletes = Lists.newArrayList(
dataDelete.copy("data", "a"), // id = 29
dataDelete.copy("data", "d"), // id = 89
dataDelete.copy("data", "h") // will be filtered
);

DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema);

table.newRowDelta()
.addDeletes(eqDeletes)
.commit();

StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89);
StructLikeSet actual = rowSet(tableName, table, "*");

Assert.assertEquals("Table should contain expected rows", expected, actual);

List<Record> filteredDataDeletes = Lists.newArrayList(
dataDelete.copy("data", "i"),
dataDelete.copy("data", "j"),
dataDelete.copy("data", "k")
);

DeleteFile filteredEqDeletes = FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), Row.of(0), filteredDataDeletes, deleteRowSchema);

table.newRowDelta()
.addDeletes(filteredEqDeletes)
.commit();

expected = rowSetWithoutIds(table, records, 29, 89);
actual = rowSet(tableName, table, "*");
Assert.assertEquals("Table should contain expected rows", expected, actual);
}

private StructLikeSet selectColumns(StructLikeSet rows, String... columns) {
Schema projection = table.schema().select(columns);
StructLikeSet set = StructLikeSet.create(projection.asStruct());
Expand Down
15 changes: 14 additions & 1 deletion data/src/test/java/org/apache/iceberg/data/FileHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;

public class FileHelpers {
private FileHelpers() {
Expand Down Expand Up @@ -96,7 +98,7 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike
public static DataFile writeDataFile(Table table, OutputFile out, List<Record> rows) throws IOException {
FileFormat format = defaultFormat(table.properties());
GenericAppenderFactory factory = new GenericAppenderFactory(table.schema());

factory.setAll(bloomFilterProperties(table.properties()));
FileAppender<Record> writer = factory.newAppender(out, format);
try (Closeable toClose = writer) {
writer.addAll(rows);
Expand Down Expand Up @@ -139,4 +141,15 @@ private static FileFormat defaultFormat(Map<String, String> properties) {
String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}

private static Map<String, String> bloomFilterProperties(Map<String, String> properties) {
Map<String, String> bloomFilterColumns = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
if (key.startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX)) {
bloomFilterColumns.put(key, entry.getValue());
}
}
return bloomFilterColumns;
}
}
19 changes: 19 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.parquet;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.BloomFilterReader;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand Down Expand Up @@ -374,4 +376,21 @@ public static boolean isIntType(PrimitiveType primitiveType) {
}
return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32;
}

public static Map<BlockMetaData, BloomFilterReader> getParquetBloomFilters(ParquetFileReader reader) {
Map<BlockMetaData, BloomFilterReader> bloomFilterReaderMap = Maps.newHashMap();
for (BlockMetaData rowGroup : reader.getRowGroups()) {
bloomFilterReaderMap.put(rowGroup, reader.getBloomFilterDataReader(rowGroup));
}
return bloomFilterReaderMap;
}

public static ParquetFileReader openFile(org.apache.iceberg.io.InputFile input) {
try {
org.apache.parquet.io.InputFile parquetInput = ParquetIO.file(input);
return ParquetFileReader.open(parquetInput);
} catch (IOException e) {
throw new UncheckedIOException("failed to open parquet file reader!", e);
}
}
}