From 1a4f9597853d4d05eafc9e2e102a3651403abe49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=95=A2?= Date: Tue, 21 Jun 2022 10:03:51 +0800 Subject: [PATCH 1/3] delete compaction optimization optimize the performance of delete compaction by bloom filter, just for parquet format in this PR. --- .../org/apache/iceberg/data/DeleteFilter.java | 81 +++++++++++++++++++ .../apache/iceberg/data/DeleteReadTests.java | 45 ++++++++++- .../org/apache/iceberg/data/FileHelpers.java | 15 +++- .../apache/iceberg/parquet/ParquetUtil.java | 19 +++++ 4 files changed, 158 insertions(+), 2 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 3bb44a1e4e0f..779e0e34534f 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -36,11 +36,15 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetBloomRowGroupFilter; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -49,10 +53,15 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multimap; import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; +import org.apache.parquet.hadoop.BloomFilterReader; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.schema.MessageType; public abstract class DeleteFilter { private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L; @@ -133,10 +142,19 @@ public CloseableIterable filter(CloseableIterable records) { private List> applyEqDeletes() { List> isInDeleteSets = Lists.newArrayList(); + Map parquetBloomFilterReader = Maps.newHashMap(); + ParquetFileReader parquetReader = null; + Predicate isInBloomFilter = null; if (eqDeletes.isEmpty()) { return isInDeleteSets; } + // load bloomfilter readers from data file + if (filePath.endsWith(".parquet")) { + parquetReader = ParquetUtil.openFile(getInputFile(filePath)); + parquetBloomFilterReader.putAll(ParquetUtil.getParquetBloomFilters(parquetReader)); + } + Multimap, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); for (DeleteFile delete : eqDeletes) { filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete); @@ -148,6 +166,12 @@ private List> applyEqDeletes() { Schema deleteSchema = TypeUtil.select(requiredSchema, ids); + if (filePath.endsWith(".parquet") && parquetReader != null) { + MessageType fileSchema = parquetReader.getFileMetaData().getSchema(); + isInBloomFilter = + record -> findInParquetBloomFilter(record, deleteSchema, fileSchema, parquetBloomFilterReader); + } + // a projection to select and reorder fields of the file schema to match the delete rows StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema); @@ -158,6 +182,11 @@ private List> applyEqDeletes() { CloseableIterable records = CloseableIterable.transform( CloseableIterable.concat(deleteRecords), Record::copy); + // apply bloomfilter on delete records + if (isInBloomFilter != null) { + records = CloseableIterable.filter(records, isInBloomFilter); + } + StructLikeSet deleteSet = Deletes.toEqualitySet( CloseableIterable.transform( records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)), @@ -170,6 +199,58 @@ private List> applyEqDeletes() { return isInDeleteSets; } + private boolean findInParquetBloomFilter( + Record record, + Schema deleteSchema, + MessageType fileSchema, + Map parquetBloomFilterReader) { + if (record.size() == 0) { + return true; + } + // build filter by record values + Expression filter = buildFilter(record, deleteSchema); + + ParquetBloomRowGroupFilter bloomFilter = new ParquetBloomRowGroupFilter(deleteSchema, filter, true); + for (Map.Entry entry : parquetBloomFilterReader.entrySet()) { + boolean shouldRead = bloomFilter.shouldRead(fileSchema, entry.getKey(), entry.getValue()); + if (shouldRead) { + return true; + } + } + + return false; + } + + private Expression buildFilter(Record record, Schema schema) { + Expression filter = Expressions.alwaysTrue(); + for (Types.NestedField field : schema.columns()) { + Object value = getRecordValue(record, field); + if (value == null) { + continue; + } + filter = Expressions.and(filter, Expressions.equal(field.name(), value)); + } + return filter; + } + + private Object getRecordValue(Record record, Types.NestedField field) { + Type type = field.type(); + switch (type.toString()) { + case "date": + return Literal.of(record.getField(field.name()).toString()).to(Types.DateType.get()).value(); + case "time": + return Literal.of(record.getField(field.name()).toString()).to(Types.TimeType.get()).value(); + case "timestamp": + if (((Types.TimestampType) type).shouldAdjustToUTC()) { + return Literal.of(record.getField(field.name()).toString()).to(Types.TimestampType.withZone()).value(); + } else { + return Literal.of(record.getField(field.name()).toString()).to(Types.TimestampType.withoutZone()).value(); + } + default: + return record.getField(field.name()); + } + } + public CloseableIterable findEqualityDeleteRows(CloseableIterable records) { // Predicate to test whether a row has been deleted by equality deletions. Predicate deletedRows = applyEqDeletes().stream() diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 75e1657d54e6..13d016a62856 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers.Row; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -84,7 +85,9 @@ public void writeTestDataFile() throws IOException { this.tableName = "test"; this.table = createTable(tableName, SCHEMA, SPEC); this.records = Lists.newArrayList(); - + table.updateProperties() + .set(TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "data", "true") + .commit(); // records all use IDs that are in bucket id_bucket=0 GenericRecord record = GenericRecord.create(table.schema()); records.add(record.copy("id", 29, "data", "a")); @@ -435,6 +438,46 @@ public void testEqualityDeleteByNull() throws IOException { Assert.assertEquals("Table should contain expected rows", expected, actual); } + @Test + public void testEqualityDeleteWithBloomFilter() throws IOException { + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "a"), // id = 29 + dataDelete.copy("data", "d"), // id = 89 + dataDelete.copy("data", "h") // will be filtered + ); + + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema); + + table.newRowDelta() + .addDeletes(eqDeletes) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89); + StructLikeSet actual = rowSet(tableName, table, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + + List filteredDataDeletes = Lists.newArrayList( + dataDelete.copy("data", "i"), + dataDelete.copy("data", "j"), + dataDelete.copy("data", "k") + ); + + DeleteFile filteredEqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), Row.of(0), filteredDataDeletes, deleteRowSchema); + + table.newRowDelta() + .addDeletes(filteredEqDeletes) + .commit(); + + expected = rowSetWithoutIds(table, records, 29, 89); + actual = rowSet(tableName, table, "*"); + Assert.assertEquals("Table should contain expected rows", expected, actual); + } + private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { Schema projection = table.schema().select(columns); StructLikeSet set = StructLikeSet.create(projection.asStruct()); diff --git a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java index dfcb75b04b01..a8fedf172220 100644 --- a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java @@ -39,12 +39,14 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; public class FileHelpers { private FileHelpers() { @@ -96,7 +98,7 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike public static DataFile writeDataFile(Table table, OutputFile out, List rows) throws IOException { FileFormat format = defaultFormat(table.properties()); GenericAppenderFactory factory = new GenericAppenderFactory(table.schema()); - + factory.setAll(bloomFilterProperties(table.properties())); FileAppender writer = factory.newAppender(out, format); try (Closeable toClose = writer) { writer.addAll(rows); @@ -139,4 +141,15 @@ private static FileFormat defaultFormat(Map properties) { String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); } + + private static Map bloomFilterProperties(Map properties) { + Map bloomFilterColumns = Maps.newHashMap(); + for (Map.Entry entry : properties.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX)) { + bloomFilterColumns.put(key, entry.getValue()); + } + } + return bloomFilterColumns; + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index 51a40eaf9487..be3b1cc3f209 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -20,6 +20,7 @@ package org.apache.iceberg.parquet; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Iterator; @@ -56,6 +57,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.BloomFilterReader; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -374,4 +376,21 @@ public static boolean isIntType(PrimitiveType primitiveType) { } return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32; } + + public static Map getParquetBloomFilters(ParquetFileReader reader) { + Map bloomFilterReaderMap = Maps.newHashMap(); + for (BlockMetaData rowGroup : reader.getRowGroups()) { + bloomFilterReaderMap.put(rowGroup, reader.getBloomFilterDataReader(rowGroup)); + } + return bloomFilterReaderMap; + } + + public static ParquetFileReader openFile(org.apache.iceberg.io.InputFile input) { + try { + org.apache.parquet.io.InputFile parquetInput = ParquetIO.file(input); + return ParquetFileReader.open(parquetInput); + } catch (IOException e) { + throw new UncheckedIOException("failed to open parquet file reader!", e); + } + } } From 82c48aa482ab6d7e786eed4213657dec28a7ed42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=95=A2?= Date: Tue, 21 Jun 2022 16:48:48 +0800 Subject: [PATCH 2/3] Update DeleteFilter.java --- data/src/main/java/org/apache/iceberg/data/DeleteFilter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 779e0e34534f..d7415690bc12 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -204,12 +204,11 @@ private boolean findInParquetBloomFilter( Schema deleteSchema, MessageType fileSchema, Map parquetBloomFilterReader) { - if (record.size() == 0) { + if (record.size() == 0 || parquetBloomFilterReader.isEmpty()) { return true; } // build filter by record values Expression filter = buildFilter(record, deleteSchema); - ParquetBloomRowGroupFilter bloomFilter = new ParquetBloomRowGroupFilter(deleteSchema, filter, true); for (Map.Entry entry : parquetBloomFilterReader.entrySet()) { boolean shouldRead = bloomFilter.shouldRead(fileSchema, entry.getKey(), entry.getValue()); From ce3393c58e01542be90489aa39992ed237b8ae64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=95=A2?= Date: Tue, 21 Jun 2022 19:24:22 +0800 Subject: [PATCH 3/3] close parquet reader close parquet reader --- .../java/org/apache/iceberg/data/DeleteFilter.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index d7415690bc12..31fb660b4c61 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.data; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -195,7 +197,13 @@ record -> findInParquetBloomFilter(record, deleteSchema, fileSchema, parquetBloo Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); isInDeleteSets.add(isInDeleteSet); } - + try { + if (parquetReader != null) { + parquetReader.close(); + } + } catch (IOException e) { + throw new UncheckedIOException("failed to close parquet file reader!", e); + } return isInDeleteSets; }