From 6e4a25e7af491f2e1d8769df2d8467157d7b52c2 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Fri, 5 Nov 2021 15:30:30 -0700 Subject: [PATCH 01/21] Add interface PositionDeleteIndex to avoid expose RoaringBitmap in public method. --- .../arrow/vectorized/BaseBatchReader.java | 2 +- .../vectorized/VectorizedReaderBuilder.java | 8 + build.gradle | 2 + .../org/apache/iceberg/deletes/Deletes.java | 19 +++ .../apache/iceberg/util/TableScanUtil.java | 10 ++ .../org/apache/iceberg/data/DeleteFilter.java | 19 +++ .../apache/iceberg/data/DeleteReadTests.java | 39 ++++- spark/v3.0/build.gradle | 2 + spark/v3.2/build.gradle | 2 + .../spark/source/IcebergSourceBenchmark.java | 56 +++++++ ...gSourceFlatParquetDataDeleteBenchmark.java | 154 ++++++++++++++++++ .../vectorized/ColumnVectorWithFilter.java | 105 ++++++++++++ .../data/vectorized/ColumnarBatchReader.java | 80 ++++++++- .../vectorized/IcebergArrowColumnVector.java | 8 + .../VectorizedSparkParquetReaders.java | 46 ++++++ .../iceberg/spark/source/BaseDataReader.java | 4 + .../iceberg/spark/source/BatchDataReader.java | 32 +++- .../iceberg/spark/source/SparkBatchScan.java | 9 +- .../spark/source/TestSparkReaderDeletes.java | 49 +++++- versions.props | 3 +- 20 files changed, 637 insertions(+), 12 deletions(-) create mode 100644 spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java index 10f4eb0f1acf..76b5fd55d521 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java @@ -42,7 +42,7 @@ protected BaseBatchReader(List> readers) { } @Override - public final void setRowGroupInfo( + public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { for (VectorizedArrowReader reader : readers) { if (reader != null) { diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java index 2bace1e5d53d..5a9a174dc6ec 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java @@ -61,6 +61,10 @@ public VectorizedReaderBuilder( this.readerFactory = readerFactory; } + public Function>, VectorizedReader> readerFactory() { + return readerFactory; + } + @Override public VectorizedReader message( Types.StructType expected, MessageType message, @@ -94,6 +98,10 @@ public VectorizedReader message( reorderedFields.add(VectorizedArrowReader.nulls()); } } + return vectorizedReader(reorderedFields); + } + + protected VectorizedReader vectorizedReader(List> reorderedFields) { return readerFactory.apply(reorderedFields); } diff --git a/build.gradle b/build.gradle index e0097e943c07..eb06cbe4b4d7 100644 --- a/build.gradle +++ b/build.gradle @@ -215,6 +215,7 @@ project(':iceberg-core') { implementation "com.fasterxml.jackson.core:jackson-databind" implementation "com.fasterxml.jackson.core:jackson-core" implementation "com.github.ben-manes.caffeine:caffeine" + implementation "org.roaringbitmap:RoaringBitmap" compileOnly("org.apache.hadoop:hadoop-client") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.slf4j', module: 'slf4j-log4j12' @@ -252,6 +253,7 @@ project(':iceberg-data') { exclude group: 'it.unimi.dsi' exclude group: 'org.codehaus.jackson' } + implementation "org.roaringbitmap:RoaringBitmap" compileOnly "org.apache.avro:avro" diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 62154f7d6071..6b649e34e988 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -42,6 +42,7 @@ import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; import org.apache.iceberg.util.StructLikeSet; +import org.roaringbitmap.longlong.Roaring64Bitmap; public class Deletes { private static final Schema POSITION_DELETE_SCHEMA = new Schema( @@ -107,6 +108,24 @@ public static Set toPositionSet(CloseableIterable posDeletes) { } } + public static Roaring64Bitmap toPositionBitmap(CharSequence dataLocation, + List> deleteFiles) { + DataFileFilter locationFilter = new DataFileFilter<>(dataLocation); + List> positions = Lists.transform(deleteFiles, deletes -> + CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row))); + return toPositionBitmap(CloseableIterable.concat(positions)); + } + + public static Roaring64Bitmap toPositionBitmap(CloseableIterable posDeletes) { + try (CloseableIterable deletes = posDeletes) { + Roaring64Bitmap bitmap = new Roaring64Bitmap(); + deletes.forEach(bitmap::add); + return bitmap; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close position delete source", e); + } + } + public static CloseableIterable streamingFilter(CloseableIterable rows, Function rowToPosition, CloseableIterable posDeletes) { diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index e90cb8f145c7..d8d2d68ba7dc 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -23,6 +23,7 @@ import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.ContentFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -37,6 +38,15 @@ public static boolean hasDeletes(CombinedScanTask task) { return task.files().stream().anyMatch(TableScanUtil::hasDeletes); } + /** + * This is temporarily introduced since we plan to support pos-delete vectorized read first, then get to the + * equality-delete support. We will remove this method once both are supported. + */ + public static boolean hasEqDeletes(CombinedScanTask task) { + return task.files().stream().anyMatch( + t -> t.deletes().stream().anyMatch(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))); + } + public static boolean hasDeletes(FileScanTask task) { return !task.deletes().isEmpty(); } diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index c4123d554d38..118ce0ac7477 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -55,6 +55,7 @@ import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; +import org.roaringbitmap.longlong.Roaring64Bitmap; public abstract class DeleteFilter { private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L; @@ -69,6 +70,8 @@ public abstract class DeleteFilter { private final Schema requiredSchema; private final Accessor posAccessor; + private Roaring64Bitmap deleteRowPositions = null; + protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; this.dataFile = task.file(); @@ -98,6 +101,10 @@ public Schema requiredSchema() { return requiredSchema; } + public boolean hasPosDeletes() { + return !posDeletes.isEmpty(); + } + Accessor posAccessor() { return posAccessor; } @@ -185,6 +192,18 @@ protected boolean shouldKeep(T item) { return remainingRowsFilter.filter(records); } + public Roaring64Bitmap deletedRowPositions() { + if (posDeletes.isEmpty()) { + return null; + } + + if (deleteRowPositions == null) { + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + deleteRowPositions = Deletes.toPositionBitmap(dataFile.path(), deletes); + } + return deleteRowPositions; + } + private CloseableIterable applyPosDeletes(CloseableIterable records) { if (posDeletes.isEmpty()) { return records; diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 69b0a572ad73..75e1657d54e6 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -75,9 +75,9 @@ public abstract class DeleteReadTests { protected String dateTableName = null; protected Table table = null; protected Table dateTable = null; - private List records = null; + protected List records = null; private List dateRecords = null; - private DataFile dataFile = null; + protected DataFile dataFile = null; @Before public void writeTestDataFile() throws IOException { @@ -298,6 +298,39 @@ public void testPositionDeletes() throws IOException { Assert.assertEquals("Table should contain expected rows", expected, actual); } + @Test + public void testMultiplePosDeleteFiles() throws IOException { + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 3L) // id = 89 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 6L) // id = 122 + ); + + posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); + StructLikeSet actual = rowSet(tableName, table, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } + @Test public void testMixedPositionAndEqualityDeletes() throws IOException { Schema dataSchema = table.schema().select("data"); @@ -411,7 +444,7 @@ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { return set; } - private static StructLikeSet rowSetWithoutIds(Table table, List recordList, int... idsToRemove) { + protected static StructLikeSet rowSetWithoutIds(Table table, List recordList, int... idsToRemove) { Set deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); recordList.stream() diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle index 7df4c62e5253..daff1fdd4dca 100644 --- a/spark/v3.0/build.gradle +++ b/spark/v3.0/build.gradle @@ -80,6 +80,8 @@ project(':iceberg-spark:iceberg-spark3') { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation "org.roaringbitmap:RoaringBitmap" + testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' } diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index 2a2ff62f8d15..efc9a2e0108d 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -82,6 +82,8 @@ project(':iceberg-spark:iceberg-spark-3.2') { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation "org.roaringbitmap:RoaringBitmap" + testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' // to make sure io.netty.buffer only comes from project(':iceberg-arrow') diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 2be6a8c538d3..41c58c397f10 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -22,18 +22,28 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -43,6 +53,8 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Fork(1) @State(Scope.Benchmark) @@ -51,6 +63,9 @@ @BenchmarkMode(Mode.SingleShotTime) public abstract class IcebergSourceBenchmark { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceBenchmark.class); + private static final long TARGET_FILE_SIZE_IN_BYTES = 50L * 1024 * 1024; + private final Configuration hadoopConf = initHadoopConf(); private final Table table = initTable(); private SparkSession spark; @@ -187,4 +202,45 @@ protected void withTableProperties(Map props, Action action) { restoreProperties.commit(); } } + + protected void writePosDeletes(CharSequence path, long numRows, double percentage) throws IOException { + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .build(); + + ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table().io(), + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionSpec unpartitionedSpec = table().specs().get(0); + Set deletedPos = Sets.newHashSet(); + while (deletedPos.size() < numRows * percentage) { + deletedPos.add(ThreadLocalRandom.current().nextLong(numRows)); + } + + LOG.info("pos delete row count: {}", deletedPos.size()); + + PositionDelete positionDelete = PositionDelete.create(); + try (ClusteredPositionDeleteWriter closeableWriter = writer) { + for (Long pos : deletedPos) { + positionDelete.set(path, pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + } + } + + RowDelta rowDelta = table().newRowDelta(); + writer.result().deleteFiles().forEach(deleteFile -> rowDelta.addDeletes(deleteFile)); + rowDelta.validateDeletedFiles().commit(); + } + + private OutputFileFactory newFileFactory() { + return OutputFileFactory.builderFor(table(), 1, 1) + .format(fileFormat()) + .build(); + } + + protected FileFormat fileFormat() { + throw new UnsupportedOperationException("Unsupported file format"); + } } diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java new file mode 100644 index 000000000000..86a8f9d021a3 --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.source.IcebergSourceBenchmark; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for + * Iceberg. 5% of rows are deleted in each data file. + *

+ * This class uses a dataset with a flat schema. + * To run this benchmark for spark-3: + * + * ./gradlew :iceberg-spark:iceberg-spark3:jmh + * -PjmhIncludeRegex=IcebergSourceFlatParquetDataDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-delete-benchmark-result.txt + * + */ +public class IcebergSourceFlatParquetDataDeleteBenchmark extends IcebergSourceBenchmark { + + private static final int NUM_FILES = 50; + private static final int NUM_ROWS = 100 * 1000; + private static final double PERCENTAGE_DELETE_ROW = 0.05; + + @Setup + public void setupBenchmark() throws IOException { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readIcebergVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + private void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + + // add pos-deletes + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, PERCENTAGE_DELETE_ROW); + } + } + } + + @Override + protected final Table initTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + properties.put(TableProperties.FORMAT_VERSION, "2"); + return tables.create(schema, partitionSpec, properties, newTableLocation()); + } + + @Override + protected Configuration initHadoopConf() { + return new Configuration(); + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java new file mode 100644 index 000000000000..7804c02f9042 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.arrow.vectorized.VectorHolder; +import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.unsafe.types.UTF8String; + +public class ColumnVectorWithFilter extends IcebergArrowColumnVector { + private final int[] rowIdMapping; + + public ColumnVectorWithFilter(VectorHolder holder, int[] rowIdMapping) { + super(holder); + this.rowIdMapping = rowIdMapping; + } + + @Override + public boolean isNullAt(int rowId) { + return nullabilityHolder().isNullAt(rowIdMapping[rowId]) == 1; + } + + @Override + public boolean getBoolean(int rowId) { + return accessor().getBoolean(rowIdMapping[rowId]); + } + + @Override + public int getInt(int rowId) { + return accessor().getInt(rowIdMapping[rowId]); + } + + @Override + public long getLong(int rowId) { + return accessor().getLong(rowIdMapping[rowId]); + } + + @Override + public float getFloat(int rowId) { + return accessor().getFloat(rowIdMapping[rowId]); + } + + @Override + public double getDouble(int rowId) { + return accessor().getDouble(rowIdMapping[rowId]); + } + + @Override + public ColumnarArray getArray(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getArray(rowIdMapping[rowId]); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getDecimal(rowIdMapping[rowId], precision, scale); + } + + @Override + public UTF8String getUTF8String(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getUTF8String(rowIdMapping[rowId]); + } + + @Override + public byte[] getBinary(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getBinary(rowIdMapping[rowId]); + } + + public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) { + return holder.isDummy() ? + new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) : + new ColumnVectorWithFilter(holder, rowIdMapping); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f71a6968099c..da67f6ce4794 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -20,12 +20,20 @@ package org.apache.iceberg.spark.data.vectorized; import java.util.List; +import java.util.Map; import org.apache.iceberg.arrow.vectorized.BaseBatchReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.roaringbitmap.longlong.Roaring64Bitmap; /** * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized read path. The @@ -33,11 +41,24 @@ * {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { + private DeleteFilter deletes = null; + private long rowStartPosInBatch = 0; public ColumnarBatchReader(List> readers) { super(readers); } + @Override + public void setRowGroupInfo(PageReadStore pageStore, Map metaData, + long rowPosition) { + super.setRowGroupInfo(pageStore, metaData, rowPosition); + this.rowStartPosInBatch = rowPosition; + } + + public void setDeleteFilter(DeleteFilter deleteFilter) { + this.deletes = deleteFilter; + } + @Override public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); @@ -47,6 +68,8 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { closeVectors(); } + Pair rowIdMapping = rowIdMapping(numRowsToRead); + for (int i = 0; i < readers.length; i += 1) { vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); int numRowsInVector = vectorHolders[i].numValues(); @@ -54,11 +77,62 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { numRowsInVector == numRowsToRead, "Number of rows in the vector %s didn't match expected %s ", numRowsInVector, numRowsToRead); - arrowColumnVectors[i] = - IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + + if (rowIdMapping == null) { + arrowColumnVectors[i] = IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + } else { + int[] rowIdMap = rowIdMapping.first(); + Integer numRows = rowIdMapping.second(); + arrowColumnVectors[i] = ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows); + } } + + rowStartPosInBatch += numRowsToRead; ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors); - batch.setNumRows(numRowsToRead); + + if (rowIdMapping == null) { + batch.setNumRows(numRowsToRead); + } else { + Integer numRows = rowIdMapping.second(); + batch.setNumRows(numRows); + } return batch; } + + private Pair rowIdMapping(int numRows) { + if (deletes != null && deletes.hasPosDeletes()) { + return buildRowIdMapping(deletes.deletedRowPositions(), numRows); + } else { + return null; + } + } + + /** + * Build a row id mapping inside a batch, which skips delete rows. For example, if the 1st and 3rd rows are deleted in + * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new num of rows is 3. + * @param deletedRowPositions a set of deleted row positions + * @param numRows the num of rows + * @return the mapping array and the new num of rows in a batch, null if no row is deleted + */ + private Pair buildRowIdMapping(Roaring64Bitmap deletedRowPositions, int numRows) { + if (deletedRowPositions == null) { + return null; + } + int[] rowIdMapping = new int[numRows]; + int originalRowId = 0; + int currentRowId = 0; + while (originalRowId < numRows) { + if (!deletedRowPositions.contains(originalRowId + rowStartPosInBatch)) { + rowIdMapping[currentRowId] = originalRowId; + currentRowId++; + } + originalRowId++; + } + + if (currentRowId == numRows) { + return null; + } else { + return Pair.of(rowIdMapping, currentRowId); + } + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java index 514eec84fe82..c5b7853d5353 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java @@ -48,6 +48,14 @@ public IcebergArrowColumnVector(VectorHolder holder) { this.accessor = ArrowVectorAccessors.getVectorAccessor(holder); } + protected ArrowVectorAccessor accessor() { + return accessor; + } + + protected NullabilityHolder nullabilityHolder() { + return nullabilityHolder; + } + @Override public void close() { accessor.close(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index b2d582352d74..4740a66ae47c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -19,12 +19,17 @@ package org.apache.iceberg.spark.data.vectorized; +import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.catalyst.InternalRow; public class VectorizedSparkParquetReaders { @@ -49,4 +54,45 @@ public static ColumnarBatchReader buildReader( expectedSchema, fileSchema, setArrowValidityVector, idToConstant, ColumnarBatchReader::new)); } + + public static ColumnarBatchReader buildReader( + Schema expectedSchema, + MessageType fileSchema, + boolean setArrowValidityVector, + Map idToConstant, + DeleteFilter deleteFilter) { + return (ColumnarBatchReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new ReaderBuilder( + expectedSchema, fileSchema, setArrowValidityVector, + idToConstant, ColumnarBatchReader::new, deleteFilter)); + } + + private static class ReaderBuilder extends VectorizedReaderBuilder { + private final DeleteFilter deleteFilter; + + ReaderBuilder( + Schema expectedSchema, + MessageType parquetSchema, + boolean setArrowValidityVector, + Map idToConstant, + Function>, VectorizedReader> readerFactory, + DeleteFilter deleteFilter) { + super(expectedSchema, parquetSchema, setArrowValidityVector, idToConstant, readerFactory); + this.deleteFilter = deleteFilter; + } + + @Override + protected VectorizedReader vectorizedReader(List> reorderedFields) { + if (deleteFilter == null) { + return super.vectorizedReader(reorderedFields); + } else { + VectorizedReader vecReader = readerFactory().apply(reorderedFields); + if (vecReader instanceof ColumnarBatchReader) { + ((ColumnarBatchReader) vecReader).setDeleteFilter(deleteFilter); + } + return vecReader; + } + } + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java index b58745c7a00d..f0664c7e8e29 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java @@ -91,6 +91,10 @@ abstract class BaseDataReader implements Closeable { this.currentIterator = CloseableIterator.empty(); } + protected Table table() { + return table; + } + public boolean next() throws IOException { try { while (true) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index e4bd3ceba6ce..5f05c55789ed 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -28,8 +28,10 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; @@ -38,10 +40,12 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; class BatchDataReader extends BaseDataReader { @@ -71,11 +75,14 @@ CloseableIterator open(FileScanTask task) { InputFile location = getInputFile(task); Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); if (task.file().format() == FileFormat.PARQUET) { + SparkDeleteFilter deleteFilter = deleteFilter(task); + Parquet.ReadBuilder builder = Parquet.read(location) .project(expectedSchema) .split(task.start(), task.length()) .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema, - fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant)) + fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant, + deleteFilter)) .recordsPerBatch(batchSize) .filter(task.residual()) .caseSensitive(caseSensitive) @@ -114,4 +121,27 @@ CloseableIterator open(FileScanTask task) { } return iter.iterator(); } + + private SparkDeleteFilter deleteFilter(FileScanTask task) { + return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task, table().schema(), expectedSchema); + } + + private class SparkDeleteFilter extends DeleteFilter { + private final InternalRowWrapper asStructLike; + + SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + super(task, tableSchema, requestedSchema); + this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); + } + + @Override + protected StructLike asStructLike(InternalRow row) { + return asStructLike.wrap(row); + } + + @Override + protected InputFile getInputFile(String location) { + return BatchDataReader.this.getInputFile(location); + } + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index bb7ac9448bc2..e1420ed0911c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -175,10 +175,15 @@ public PartitionReaderFactory createReaderFactory() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); + boolean hasNoEqDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasEqDeletes); + boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks); - boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + boolean batchReadOrc = hasNoDeleteFiles && allOrcFileScanTasks; + + boolean batchReadParquet = hasNoEqDeleteFiles && allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives; + + boolean readUsingBatch = batchReadsEnabled && (batchReadOrc || batchReadParquet); int batchSize = readUsingBatch ? batchSize(allParquetFileScanTasks, allOrcFileScanTasks) : 0; diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index b7398b7befb2..e1c6355b8e75 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -49,6 +49,8 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.Dataset; @@ -59,14 +61,30 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +@RunWith(Parameterized.class) public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + private final boolean vectorized; + + public TestSparkReaderDeletes(boolean vectorized) { + this.vectorized = vectorized; + } + + @Parameterized.Parameters(name = "vectorized = {0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {false}, + new Object[] {true} + }; + } @BeforeClass public static void startMetastoreAndSpark() { @@ -106,7 +124,12 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - + if (vectorized) { + table.updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") + .set(TableProperties.PARQUET_BATCH_SIZE, "4") // split 7 records to two batches to cover more code paths + .commit(); + } return table; } @@ -215,4 +238,28 @@ public void testReadEqualityDeleteRows() throws IOException { Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); } + + @Test + public void testPosDeletesAllRowsInBatch() throws IOException { + // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all deleted. + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 1L), // id = 43 + Pair.of(dataFile.path(), 2L), // id = 61 + Pair.of(dataFile.path(), 3L) // id = 89 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89); + StructLikeSet actual = rowSet(tableName, table, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } } diff --git a/versions.props b/versions.props index fa6505ad4b38..8eaf00c694c9 100644 --- a/versions.props +++ b/versions.props @@ -1,7 +1,7 @@ org.slf4j:* = 1.7.25 org.apache.avro:avro = 1.10.1 org.apache.calcite:* = 1.10.0 -org.apache.flink:* = 1.12.5 +org.apache.flink:* = 1.13.2 org.apache.hadoop:* = 2.7.3 org.apache.hive:hive-metastore = 2.3.8 org.apache.hive:hive-serde = 2.3.8 @@ -16,6 +16,7 @@ com.google.guava:guava = 28.0-jre com.github.ben-manes.caffeine:caffeine = 2.8.4 org.apache.arrow:arrow-vector = 6.0.0 org.apache.arrow:arrow-memory-netty = 6.0.0 +org.roaringbitmap:RoaringBitmap = 0.9.0 io.netty:netty-buffer = 4.1.63.Final com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 com.aliyun.oss:aliyun-sdk-oss = 3.10.2 From e7a152b7c600b825ecfcfdffe146aab04bebfa27 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Tue, 26 Oct 2021 13:58:08 -0700 Subject: [PATCH 02/21] Benchmark change --- .../iceberg/spark/source/IcebergSourceBenchmark.java | 2 +- .../IcebergSourceFlatParquetDataDeleteBenchmark.java | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 41c58c397f10..4140003068eb 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -230,7 +230,7 @@ writerFactory, fileFactory, table().io(), } RowDelta rowDelta = table().newRowDelta(); - writer.result().deleteFiles().forEach(deleteFile -> rowDelta.addDeletes(deleteFile)); + writer.result().deleteFiles().forEach(rowDelta::addDeletes); rowDelta.validateDeletedFiles().commit(); } diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java index 86a8f9d021a3..0ebfe022411d 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java @@ -36,6 +36,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.internal.SQLConf; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; @@ -61,9 +62,10 @@ */ public class IcebergSourceFlatParquetDataDeleteBenchmark extends IcebergSourceBenchmark { - private static final int NUM_FILES = 50; - private static final int NUM_ROWS = 100 * 1000; - private static final double PERCENTAGE_DELETE_ROW = 0.05; + private static final int NUM_FILES = 5; + private static final int NUM_ROWS = 1000 * 1000; + @Param({"0", "0.05", "0.25"}) + private double PERCENTAGE_DELETE_ROW; @Setup public void setupBenchmark() throws IOException { From f8ef69dc287d9edaf2836f5566b3a9cd98b71f47 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Tue, 26 Oct 2021 14:21:26 -0700 Subject: [PATCH 03/21] Change the field name style. --- ...bergSourceFlatParquetDataDeleteBenchmark.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java index 0ebfe022411d..c0b3a9fdd929 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java @@ -53,9 +53,9 @@ * Iceberg. 5% of rows are deleted in each data file. *

* This class uses a dataset with a flat schema. - * To run this benchmark for spark-3: + * To run this benchmark for spark-3.2: * - * ./gradlew :iceberg-spark:iceberg-spark3:jmh + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh * -PjmhIncludeRegex=IcebergSourceFlatParquetDataDeleteBenchmark * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-delete-benchmark-result.txt * @@ -65,7 +65,7 @@ public class IcebergSourceFlatParquetDataDeleteBenchmark extends IcebergSourceBe private static final int NUM_FILES = 5; private static final int NUM_ROWS = 1000 * 1000; @Param({"0", "0.05", "0.25"}) - private double PERCENTAGE_DELETE_ROW; + private double percentageDeleteRow; @Setup public void setupBenchmark() throws IOException { @@ -117,10 +117,12 @@ private void appendData() throws IOException { .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); appendAsFile(df); - // add pos-deletes - table().refresh(); - for (DataFile file : table().currentSnapshot().addedFiles()) { - writePosDeletes(file.path(), NUM_ROWS, PERCENTAGE_DELETE_ROW); + if(percentageDeleteRow > 0) { + // add pos-deletes + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, percentageDeleteRow); + } } } } From 619a7920e766aa7aa33a8953288773748bf88701 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Tue, 26 Oct 2021 14:30:55 -0700 Subject: [PATCH 04/21] Style issue fix. --- .../parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java index c0b3a9fdd929..1d32a29ffac0 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java @@ -117,7 +117,7 @@ private void appendData() throws IOException { .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); appendAsFile(df); - if(percentageDeleteRow > 0) { + if (percentageDeleteRow > 0) { // add pos-deletes table().refresh(); for (DataFile file : table().currentSnapshot().addedFiles()) { From f59b8b07e12c91445264bed493deb24a3861941b Mon Sep 17 00:00:00 2001 From: yufeigu Date: Wed, 27 Oct 2021 15:53:54 -0700 Subject: [PATCH 05/21] Set vectorized read through table properties. --- .../IcebergSourceFlatParquetDataDeleteBenchmark.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java index 1d32a29ffac0..be5524175280 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java @@ -34,7 +34,6 @@ import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.internal.SQLConf; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Setup; @@ -94,10 +93,10 @@ public void readIceberg() { @Benchmark @Threads(1) public void readIcebergVectorized() { - Map conf = Maps.newHashMap(); - conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); - conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); - withSQLConf(conf, () -> { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true"); + withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation); materialize(df); @@ -111,7 +110,6 @@ private void appendData() throws IOException { .withColumn("intCol", expr("CAST(longCol AS INT)")) .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) - .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))")) .withColumn("dateCol", date_add(current_date(), fileNum)) .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); @@ -134,7 +132,6 @@ protected final Table initTable() { required(2, "intCol", Types.IntegerType.get()), required(3, "floatCol", Types.FloatType.get()), optional(4, "doubleCol", Types.DoubleType.get()), - optional(5, "decimalCol", Types.DecimalType.of(20, 5)), optional(6, "dateCol", Types.DateType.get()), optional(7, "timestampCol", Types.TimestampType.withZone()), optional(8, "stringCol", Types.StringType.get())); From 4bce76a1f19d9311a5e9649d60c15baffcc31996 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Thu, 28 Oct 2021 17:37:17 -0700 Subject: [PATCH 06/21] Enforce the netty buffer version to make sure jmh test can pass. --- spark/v3.2/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index efc9a2e0108d..fca13b63b4d4 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -33,6 +33,8 @@ configure(sparkProjects) { resolutionStrategy { force 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.12.3' force 'com.fasterxml.jackson.module:jackson-module-paranamer:2.12.3' + // enforce the newer version of netty buffer in test + force 'io.netty:netty-buffer:4.1.68.Final' } } } From c0c890b0f3e3ec51e28151b11673cec7c6752c7c Mon Sep 17 00:00:00 2001 From: yufeigu Date: Mon, 1 Nov 2021 18:30:36 -0700 Subject: [PATCH 07/21] Add tests for multiple row groups and constant column vector. --- .../TestSparkParquetReadMetadataColumns.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index b68db024aab3..b379b09bcfaa 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -30,6 +30,7 @@ import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; @@ -38,8 +39,10 @@ import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.iceberg.spark.source.BatchDataReader; import org.apache.iceberg.types.Types; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.hadoop.ParquetFileReader; @@ -58,8 +61,11 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.roaringbitmap.longlong.Roaring64Bitmap; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @RunWith(Parameterized.class) public class TestSparkParquetReadMetadataColumns { @@ -165,6 +171,31 @@ public void testReadRowNumbers() throws IOException { readAndValidate(null, null, null, EXPECTED_ROWS); } + @Test + public void testReadRowNumbersWithDelete() throws IOException { + if (vectorized) { + List expectedRowsAfterDelete = new ArrayList<>(EXPECTED_ROWS); + // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100, 200) + for (int i = 1; i <= 5; i++) { + expectedRowsAfterDelete.remove(98); + } + + Parquet.ReadBuilder builder = Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA); + + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + Roaring64Bitmap deletedRowPos = new Roaring64Bitmap(); + deletedRowPos.add(98, 103); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + + builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA, + fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, Maps.newHashMap(), deleteFilter)); + builder.recordsPerBatch(RECORDS_PER_BATCH); + + validate(expectedRowsAfterDelete, builder); + } + } + @Test public void testReadRowNumbersWithFilter() throws IOException { // current iceberg supports row group filter. @@ -212,6 +243,10 @@ private void readAndValidate(Expression filter, Long splitStart, Long splitLengt builder = builder.split(splitStart, splitLength); } + validate(expected, builder); + } + + private void validate(List expected, Parquet.ReadBuilder builder) throws IOException { try (CloseableIterable reader = vectorized ? batchesToRows(builder.build()) : builder.build()) { final Iterator actualRows = reader.iterator(); From 5c7d230f79ebb9576872d21f88bf299a82010913 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Mon, 1 Nov 2021 18:38:47 -0700 Subject: [PATCH 08/21] Add tests for multiple row groups and constant column vector. --- .../iceberg/spark/data/TestSparkParquetReadMetadataColumns.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index b379b09bcfaa..854a8f0ec499 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -42,7 +42,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; -import org.apache.iceberg.spark.source.BatchDataReader; import org.apache.iceberg.types.Types; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.hadoop.ParquetFileReader; From 2b5204142713721ceffee37ee80645df7e1986e4 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Mon, 1 Nov 2021 18:51:40 -0700 Subject: [PATCH 09/21] Benchmark refactor --- .../parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java index be5524175280..b9bac546e9e7 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java @@ -61,8 +61,8 @@ */ public class IcebergSourceFlatParquetDataDeleteBenchmark extends IcebergSourceBenchmark { - private static final int NUM_FILES = 5; - private static final int NUM_ROWS = 1000 * 1000; + private static final int NUM_FILES = 1; + private static final int NUM_ROWS = 10 * 1000 * 1000; @Param({"0", "0.05", "0.25"}) private double percentageDeleteRow; From fee319e918a29f64fa417c66fe6ef2c4828dbc71 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Mon, 1 Nov 2021 18:52:05 -0700 Subject: [PATCH 10/21] Revert "Enforce the netty buffer version to make sure jmh test can pass." This reverts commit 9e35f8b688a8f6f81b1d0113d837c67ddcf8e69a. --- spark/v3.2/build.gradle | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index fca13b63b4d4..e6fd4a9cacfe 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -33,8 +33,6 @@ configure(sparkProjects) { resolutionStrategy { force 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.12.3' force 'com.fasterxml.jackson.module:jackson-module-paranamer:2.12.3' - // enforce the newer version of netty buffer in test - force 'io.netty:netty-buffer:4.1.68.Final' } } } @@ -66,8 +64,6 @@ project(':iceberg-spark:iceberg-spark-3.2') { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' - // to make sure io.netty.buffer only comes from project(':iceberg-arrow') - exclude group: 'io.netty', module: 'netty-buffer' } implementation("org.apache.orc:orc-core::nohive") { From 2ce64cd4eddb1248567710353393556bed396358 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Wed, 3 Nov 2021 10:51:45 -0700 Subject: [PATCH 11/21] Resolve comments on the benchmark. --- .../apache/iceberg/spark/source/IcebergSourceBenchmark.java | 2 +- .../parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 4140003068eb..56582002432d 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -64,7 +64,7 @@ public abstract class IcebergSourceBenchmark { private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceBenchmark.class); - private static final long TARGET_FILE_SIZE_IN_BYTES = 50L * 1024 * 1024; + private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024; private final Configuration hadoopConf = initHadoopConf(); private final Table table = initTable(); diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java index b9bac546e9e7..fcf2e62f748f 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java @@ -63,7 +63,7 @@ public class IcebergSourceFlatParquetDataDeleteBenchmark extends IcebergSourceBe private static final int NUM_FILES = 1; private static final int NUM_ROWS = 10 * 1000 * 1000; - @Param({"0", "0.05", "0.25"}) + @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) private double percentageDeleteRow; @Setup @@ -107,7 +107,7 @@ private void appendData() throws IOException { for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { Dataset df = spark().range(NUM_ROWS) .withColumnRenamed("id", "longCol") - .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)")) .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) .withColumn("dateCol", date_add(current_date(), fileNum)) From 24c9ef9eeb13457474499b014b964384423fb3e3 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Wed, 3 Nov 2021 20:09:27 -0700 Subject: [PATCH 12/21] Add more benchmarks --- .../source/IcebergSourceDeleteBenchmark.java | 212 ++++++++++++++++++ ...gSourceFlatParquetDataDeleteBenchmark.java | 155 ------------- .../IcebergSourceParquetDeleteBenchmark.java | 63 ++++++ ...SourceParquetMultiDeleteFileBenchmark.java | 60 +++++ ...ceParquetWithUnrelatedDeleteBenchmark.java | 62 +++++ 5 files changed, 397 insertions(+), 155 deletions(-) create mode 100644 spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java delete mode 100644 spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java create mode 100644 spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java create mode 100644 spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java create mode 100644 spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java new file mode 100644 index 000000000000..3483fe07c479 --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +public abstract class IcebergSourceDeleteBenchmark extends IcebergSourceBenchmark { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class); + private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024; + + protected static final int NUM_FILES = 1; + protected static final int NUM_ROWS = 10 * 1000 * 1000; + + @Setup + public void setupBenchmark() throws IOException { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readIcebergVectorized() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true"); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + protected abstract void appendData() throws IOException; + + protected void writeData(int fileNum) { + Dataset df = spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + } + + @Override + protected Table initTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + properties.put(TableProperties.FORMAT_VERSION, "2"); + return tables.create(schema, partitionSpec, properties, newTableLocation()); + } + + @Override + protected Configuration initHadoopConf() { + return new Configuration(); + } + + protected void writePosDeletes(CharSequence path, long numRows, double percentage) throws IOException { + writePosDeletes(path, numRows, percentage, 1); + } + + protected void writePosDeletes(CharSequence path, long numRows, double percentage, + int numDeleteFile) throws IOException { + writePosDeletesWithNoise(path, numRows, percentage, 0, numDeleteFile); + } + + protected void writePosDeletesWithNoise(CharSequence path, long numRows, double percentage, int numNoise, + int numDeleteFile) throws IOException { + Set deletedPos = Sets.newHashSet(); + while (deletedPos.size() < numRows * percentage) { + deletedPos.add(ThreadLocalRandom.current().nextLong(numRows)); + } + LOG.info("pos delete row count: {}, num of delete files: {}", deletedPos.size(), numDeleteFile); + + int partitionSize = (int) (numRows * percentage) / numDeleteFile; + Iterable> sets = Iterables.partition(deletedPos, partitionSize); + for (List item : sets) { + writePosDeletes(path, item, numNoise); + } + } + + protected void writePosDeletes(CharSequence path, List deletedPos, int numNoise) throws IOException { + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .build(); + + ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table().io(), + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionSpec unpartitionedSpec = table().specs().get(0); + + + PositionDelete positionDelete = PositionDelete.create(); + try (ClusteredPositionDeleteWriter closeableWriter = writer) { + for (Long pos : deletedPos) { + positionDelete.set(path, pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + for (int i = 0; i < numNoise; i++) { + positionDelete.set(noisePath(path), pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + } + } + } + + RowDelta rowDelta = table().newRowDelta(); + writer.result().deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.validateDeletedFiles().commit(); + } + + private OutputFileFactory newFileFactory() { + return OutputFileFactory.builderFor(table(), 1, 1) + .format(fileFormat()) + .build(); + } + + private CharSequence noisePath(CharSequence path) { + // assume the data file name would be something like "00000-0-30da64e0-56b5-4743-a11b-3188a1695bf7-00001.parquet" + // so the dataFileSuffixLen is the UUID string length + length of "-00001.parquet", which is 36 + 14 = 60. It's OK + // to be not accurate here. + int dataFileSuffixLen = 60; + UUID uuid = UUID.randomUUID(); + if (path.length() > dataFileSuffixLen) { + return path.subSequence(0, dataFileSuffixLen) + uuid.toString(); + } else { + return uuid.toString(); + } + } +} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java deleted file mode 100644 index fcf2e62f748f..000000000000 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.spark.source.parquet; - -import java.io.IOException; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.source.IcebergSourceBenchmark; -import org.apache.iceberg.types.Types; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Threads; - -import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.apache.spark.sql.functions.current_date; -import static org.apache.spark.sql.functions.date_add; -import static org.apache.spark.sql.functions.expr; - -/** - * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for - * Iceberg. 5% of rows are deleted in each data file. - *

- * This class uses a dataset with a flat schema. - * To run this benchmark for spark-3.2: - * - * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh - * -PjmhIncludeRegex=IcebergSourceFlatParquetDataDeleteBenchmark - * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-delete-benchmark-result.txt - * - */ -public class IcebergSourceFlatParquetDataDeleteBenchmark extends IcebergSourceBenchmark { - - private static final int NUM_FILES = 1; - private static final int NUM_ROWS = 10 * 1000 * 1000; - @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) - private double percentageDeleteRow; - - @Setup - public void setupBenchmark() throws IOException { - setupSpark(); - appendData(); - } - - @TearDown - public void tearDownBenchmark() throws IOException { - tearDownSpark(); - cleanupFiles(); - } - - @Benchmark - @Threads(1) - public void readIceberg() { - Map tableProperties = Maps.newHashMap(); - tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); - withTableProperties(tableProperties, () -> { - String tableLocation = table().location(); - Dataset df = spark().read().format("iceberg").load(tableLocation); - materialize(df); - }); - } - - @Benchmark - @Threads(1) - public void readIcebergVectorized() { - Map tableProperties = Maps.newHashMap(); - tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); - tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true"); - withTableProperties(tableProperties, () -> { - String tableLocation = table().location(); - Dataset df = spark().read().format("iceberg").load(tableLocation); - materialize(df); - }); - } - - private void appendData() throws IOException { - for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { - Dataset df = spark().range(NUM_ROWS) - .withColumnRenamed("id", "longCol") - .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)")) - .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) - .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) - .withColumn("dateCol", date_add(current_date(), fileNum)) - .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) - .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); - appendAsFile(df); - - if (percentageDeleteRow > 0) { - // add pos-deletes - table().refresh(); - for (DataFile file : table().currentSnapshot().addedFiles()) { - writePosDeletes(file.path(), NUM_ROWS, percentageDeleteRow); - } - } - } - } - - @Override - protected final Table initTable() { - Schema schema = new Schema( - required(1, "longCol", Types.LongType.get()), - required(2, "intCol", Types.IntegerType.get()), - required(3, "floatCol", Types.FloatType.get()), - optional(4, "doubleCol", Types.DoubleType.get()), - optional(6, "dateCol", Types.DateType.get()), - optional(7, "timestampCol", Types.TimestampType.withZone()), - optional(8, "stringCol", Types.StringType.get())); - PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); - HadoopTables tables = new HadoopTables(hadoopConf()); - Map properties = Maps.newHashMap(); - properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); - properties.put(TableProperties.FORMAT_VERSION, "2"); - return tables.create(schema, partitionSpec, properties, newTableLocation()); - } - - @Override - protected Configuration initHadoopConf() { - return new Configuration(); - } - - @Override - protected FileFormat fileFormat() { - return FileFormat.PARQUET; - } -} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java new file mode 100644 index 000000000000..234c6c5666ca --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for + * Iceberg. + *

+ * This class uses a dataset with a flat schema. + * To run this benchmark for spark-3.2: + * + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetDeleteBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) + private double percentDeleteRow; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + if (percentDeleteRow > 0) { + // add pos-deletes + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, percentDeleteRow); + } + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java new file mode 100644 index 000000000000..fb3933186c4f --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for + * Iceberg. + *

+ * This class uses a dataset with a flat schema. + * To run this benchmark for spark-3.2: + * + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetMultiDeleteFileBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-multi-delete-file-benchmark-result.txt + * + */ +public class IcebergSourceParquetMultiDeleteFileBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"1", "2", "5", "10"}) + private int numDeleteFile; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, 0.25, numDeleteFile); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java new file mode 100644 index 000000000000..a06cadb1e525 --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for + * Iceberg. + *

+ * This class uses a dataset with a flat schema. + * To run this benchmark for spark-3.2: + * + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetWithUnrelatedDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-with-unrelated-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetWithUnrelatedDeleteBenchmark extends IcebergSourceDeleteBenchmark { + private static final double PERCENT_DELETE_ROW = 0.05; + @Param({"0", "0.05", "0.25", "0.5"}) + private double percentUnrelatedDeletes; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletesWithNoise(file.path(), NUM_ROWS, PERCENT_DELETE_ROW, + (int) (percentUnrelatedDeletes / PERCENT_DELETE_ROW), 1); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} From 67767d11327ecaf0f3638576b89df81649105433 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Thu, 4 Nov 2021 12:24:59 -0700 Subject: [PATCH 13/21] Resolve comments. --- .../vectorized/VectorizedReaderBuilder.java | 4 -- spark/v3.0/build.gradle | 2 - spark/v3.2/build.gradle | 2 + .../spark/source/IcebergSourceBenchmark.java | 51 ------------------- .../source/IcebergSourceDeleteBenchmark.java | 15 +++--- .../data/vectorized/ColumnarBatchReader.java | 2 + .../VectorizedSparkParquetReaders.java | 36 ++++++------- versions.props | 2 +- 8 files changed, 27 insertions(+), 87 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java index 5a9a174dc6ec..04b99db54dc3 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java @@ -61,10 +61,6 @@ public VectorizedReaderBuilder( this.readerFactory = readerFactory; } - public Function>, VectorizedReader> readerFactory() { - return readerFactory; - } - @Override public VectorizedReader message( Types.StructType expected, MessageType message, diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle index daff1fdd4dca..7df4c62e5253 100644 --- a/spark/v3.0/build.gradle +++ b/spark/v3.0/build.gradle @@ -80,8 +80,6 @@ project(':iceberg-spark:iceberg-spark3') { exclude group: 'com.google.code.findbugs', module: 'jsr305' } - implementation "org.roaringbitmap:RoaringBitmap" - testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' } diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index e6fd4a9cacfe..efc9a2e0108d 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -64,6 +64,8 @@ project(':iceberg-spark:iceberg-spark-3.2') { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + // to make sure io.netty.buffer only comes from project(':iceberg-arrow') + exclude group: 'io.netty', module: 'netty-buffer' } implementation("org.apache.orc:orc-core::nohive") { diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 56582002432d..ca1e92fdb7ae 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -22,28 +22,19 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.RowDelta; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; -import org.apache.iceberg.deletes.PositionDelete; -import org.apache.iceberg.io.ClusteredPositionDeleteWriter; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -53,8 +44,6 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Fork(1) @State(Scope.Benchmark) @@ -63,9 +52,6 @@ @BenchmarkMode(Mode.SingleShotTime) public abstract class IcebergSourceBenchmark { - private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceBenchmark.class); - private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024; - private final Configuration hadoopConf = initHadoopConf(); private final Table table = initTable(); private SparkSession spark; @@ -203,43 +189,6 @@ protected void withTableProperties(Map props, Action action) { } } - protected void writePosDeletes(CharSequence path, long numRows, double percentage) throws IOException { - OutputFileFactory fileFactory = newFileFactory(); - SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) - .dataFileFormat(fileFormat()) - .build(); - - ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table().io(), - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); - - PartitionSpec unpartitionedSpec = table().specs().get(0); - Set deletedPos = Sets.newHashSet(); - while (deletedPos.size() < numRows * percentage) { - deletedPos.add(ThreadLocalRandom.current().nextLong(numRows)); - } - - LOG.info("pos delete row count: {}", deletedPos.size()); - - PositionDelete positionDelete = PositionDelete.create(); - try (ClusteredPositionDeleteWriter closeableWriter = writer) { - for (Long pos : deletedPos) { - positionDelete.set(path, pos, null); - closeableWriter.write(positionDelete, unpartitionedSpec, null); - } - } - - RowDelta rowDelta = table().newRowDelta(); - writer.result().deleteFiles().forEach(rowDelta::addDeletes); - rowDelta.validateDeletedFiles().commit(); - } - - private OutputFileFactory newFileFactory() { - return OutputFileFactory.builderFor(table(), 1, 1) - .format(fileFormat()) - .build(); - } - protected FileFormat fileFormat() { throw new UnsupportedOperationException("Unsupported file format"); } diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java index 3483fe07c479..d5a6db3b385c 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -156,7 +156,7 @@ protected void writePosDeletesWithNoise(CharSequence path, long numRows, double int partitionSize = (int) (numRows * percentage) / numDeleteFile; Iterable> sets = Iterables.partition(deletedPos, partitionSize); - for (List item : sets) { + for (List item : sets) { writePosDeletes(path, item, numNoise); } } @@ -164,16 +164,15 @@ protected void writePosDeletesWithNoise(CharSequence path, long numRows, double protected void writePosDeletes(CharSequence path, List deletedPos, int numNoise) throws IOException { OutputFileFactory fileFactory = newFileFactory(); SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) - .dataFileFormat(fileFormat()) - .build(); + .dataFileFormat(fileFormat()) + .build(); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table().io(), - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, table().io(), + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); PartitionSpec unpartitionedSpec = table().specs().get(0); - PositionDelete positionDelete = PositionDelete.create(); try (ClusteredPositionDeleteWriter closeableWriter = writer) { for (Long pos : deletedPos) { @@ -193,8 +192,8 @@ writerFactory, fileFactory, table().io(), private OutputFileFactory newFileFactory() { return OutputFileFactory.builderFor(table(), 1, 1) - .format(fileFormat()) - .build(); + .format(fileFormat()) + .build(); } private CharSequence noisePath(CharSequence path) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index da67f6ce4794..bdffc394ec44 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -118,6 +118,7 @@ private Pair buildRowIdMapping(Roaring64Bitmap deletedRowPositio if (deletedRowPositions == null) { return null; } + int[] rowIdMapping = new int[numRows]; int originalRowId = 0; int currentRowId = 0; @@ -130,6 +131,7 @@ private Pair buildRowIdMapping(Roaring64Bitmap deletedRowPositio } if (currentRowId == numRows) { + // there is no delete in this batch return null; } else { return Pair.of(rowIdMapping, currentRowId); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 4740a66ae47c..020b35f52844 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -55,12 +55,11 @@ public static ColumnarBatchReader buildReader( idToConstant, ColumnarBatchReader::new)); } - public static ColumnarBatchReader buildReader( - Schema expectedSchema, - MessageType fileSchema, - boolean setArrowValidityVector, - Map idToConstant, - DeleteFilter deleteFilter) { + public static ColumnarBatchReader buildReader(Schema expectedSchema, + MessageType fileSchema, + boolean setArrowValidityVector, + Map idToConstant, + DeleteFilter deleteFilter) { return (ColumnarBatchReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, new ReaderBuilder( @@ -71,28 +70,23 @@ public static ColumnarBatchReader buildReader( private static class ReaderBuilder extends VectorizedReaderBuilder { private final DeleteFilter deleteFilter; - ReaderBuilder( - Schema expectedSchema, - MessageType parquetSchema, - boolean setArrowValidityVector, - Map idToConstant, - Function>, VectorizedReader> readerFactory, - DeleteFilter deleteFilter) { + ReaderBuilder(Schema expectedSchema, + MessageType parquetSchema, + boolean setArrowValidityVector, + Map idToConstant, + Function>, VectorizedReader> readerFactory, + DeleteFilter deleteFilter) { super(expectedSchema, parquetSchema, setArrowValidityVector, idToConstant, readerFactory); this.deleteFilter = deleteFilter; } @Override protected VectorizedReader vectorizedReader(List> reorderedFields) { - if (deleteFilter == null) { - return super.vectorizedReader(reorderedFields); - } else { - VectorizedReader vecReader = readerFactory().apply(reorderedFields); - if (vecReader instanceof ColumnarBatchReader) { - ((ColumnarBatchReader) vecReader).setDeleteFilter(deleteFilter); - } - return vecReader; + VectorizedReader reader = super.vectorizedReader(reorderedFields); + if (deleteFilter != null) { + ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter); } + return reader; } } } diff --git a/versions.props b/versions.props index 8eaf00c694c9..0becbf59f600 100644 --- a/versions.props +++ b/versions.props @@ -1,7 +1,7 @@ org.slf4j:* = 1.7.25 org.apache.avro:avro = 1.10.1 org.apache.calcite:* = 1.10.0 -org.apache.flink:* = 1.13.2 +org.apache.flink:* = 1.12.5 org.apache.hadoop:* = 2.7.3 org.apache.hive:hive-metastore = 2.3.8 org.apache.hive:hive-serde = 2.3.8 From f1c7df78be794bda294c1fd66dddf47335a99e89 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Thu, 4 Nov 2021 12:45:15 -0700 Subject: [PATCH 14/21] Add RoaringBitmap dependency to Spark3.0 and Spark3.1 --- spark/v3.0/build.gradle | 2 ++ spark/v3.1/build.gradle | 2 ++ 2 files changed, 4 insertions(+) diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle index 7df4c62e5253..daff1fdd4dca 100644 --- a/spark/v3.0/build.gradle +++ b/spark/v3.0/build.gradle @@ -80,6 +80,8 @@ project(':iceberg-spark:iceberg-spark3') { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation "org.roaringbitmap:RoaringBitmap" + testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' } diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle index 94befdf59edd..568b805d5c3a 100644 --- a/spark/v3.1/build.gradle +++ b/spark/v3.1/build.gradle @@ -80,6 +80,8 @@ project(':iceberg-spark:iceberg-spark-3.1') { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation "org.roaringbitmap:RoaringBitmap" + testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' } From 9b72f7cf88ee7f18ff7d399220ac020ffb99b1f1 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Thu, 4 Nov 2021 18:35:10 -0700 Subject: [PATCH 15/21] Shaded org.roaringbitmap in Spark3.2. --- spark/v3.1/build.gradle | 2 -- spark/v3.2/build.gradle | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle index 568b805d5c3a..94befdf59edd 100644 --- a/spark/v3.1/build.gradle +++ b/spark/v3.1/build.gradle @@ -80,8 +80,6 @@ project(':iceberg-spark:iceberg-spark-3.1') { exclude group: 'com.google.code.findbugs', module: 'jsr305' } - implementation "org.roaringbitmap:RoaringBitmap" - testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' } diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index efc9a2e0108d..a332e44e0b0d 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -246,6 +246,7 @@ project(':iceberg-spark:iceberg-spark-3.2-runtime') { relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' // relocate Antlr runtime and related deps to shade Iceberg specific version relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4' + relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' classifier null } From f55c7c99aa6c6ae6d5c42374d1c08f3c3413280c Mon Sep 17 00:00:00 2001 From: yufeigu Date: Thu, 4 Nov 2021 18:39:46 -0700 Subject: [PATCH 16/21] Resolve comments. --- .../iceberg/arrow/vectorized/VectorizedReaderBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java index 04b99db54dc3..ad69d5e87e74 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java @@ -97,7 +97,7 @@ public VectorizedReader message( return vectorizedReader(reorderedFields); } - protected VectorizedReader vectorizedReader(List> reorderedFields) { + protected VectorizedReader vectorizedReader(List> reorderedFields) { return readerFactory.apply(reorderedFields); } From 6070ce492d8f7ac339836b3ebddc7ccdc6ec5e31 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Fri, 5 Nov 2021 15:24:59 -0700 Subject: [PATCH 17/21] Add interface PositionDeleteIndex to avoid expose RoaringBitmap in public method. --- .../deletes/BitmapPositionDeleteIndex.java | 45 +++++++++++++++++++ .../org/apache/iceberg/deletes/Deletes.java | 13 +++--- .../iceberg/deletes/PositionDeleteIndex.java | 42 +++++++++++++++++ .../org/apache/iceberg/data/DeleteFilter.java | 5 ++- spark/v2.4/build.gradle | 2 + spark/v3.0/build.gradle | 5 ++- spark/v3.1/build.gradle | 1 + spark/v3.2/build.gradle | 4 +- .../data/vectorized/ColumnarBatchReader.java | 6 +-- .../TestSparkParquetReadMetadataColumns.java | 7 +-- 10 files changed, 111 insertions(+), 19 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java create mode 100644 core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java new file mode 100644 index 000000000000..291d8eebdb91 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.deletes; + +import org.roaringbitmap.longlong.Roaring64Bitmap; + +public class BitmapPositionDeleteIndex implements PositionDeleteIndex { + private final Roaring64Bitmap roaring64Bitmap; + + public BitmapPositionDeleteIndex() { + roaring64Bitmap = new Roaring64Bitmap(); + } + + @Override + public void delete(long position) { + roaring64Bitmap.add(position); + } + + @Override + public void delete(long posStart, long posEnd) { + roaring64Bitmap.add(posStart, posEnd); + } + + @Override + public boolean deleted(long position) { + return roaring64Bitmap.contains(position); + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 6b649e34e988..41c7793c769f 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -42,7 +42,6 @@ import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; import org.apache.iceberg.util.StructLikeSet; -import org.roaringbitmap.longlong.Roaring64Bitmap; public class Deletes { private static final Schema POSITION_DELETE_SCHEMA = new Schema( @@ -108,19 +107,19 @@ public static Set toPositionSet(CloseableIterable posDeletes) { } } - public static Roaring64Bitmap toPositionBitmap(CharSequence dataLocation, - List> deleteFiles) { + public static PositionDeleteIndex toPositionBitmap(CharSequence dataLocation, + List> deleteFiles) { DataFileFilter locationFilter = new DataFileFilter<>(dataLocation); List> positions = Lists.transform(deleteFiles, deletes -> CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row))); return toPositionBitmap(CloseableIterable.concat(positions)); } - public static Roaring64Bitmap toPositionBitmap(CloseableIterable posDeletes) { + public static PositionDeleteIndex toPositionBitmap(CloseableIterable posDeletes) { try (CloseableIterable deletes = posDeletes) { - Roaring64Bitmap bitmap = new Roaring64Bitmap(); - deletes.forEach(bitmap::add); - return bitmap; + PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(); + deletes.forEach(positionDeleteIndex::delete); + return positionDeleteIndex; } catch (IOException e) { throw new UncheckedIOException("Failed to close position delete source", e); } diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java new file mode 100644 index 000000000000..73ef397a9450 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.deletes; + +public interface PositionDeleteIndex { + /** + * Set a deleted row position. + * @param position the deleted row position + */ + void delete(long position); + + /** + * Set a range of deleted row positions. + * @param posStart inclusive beginning of position range + * @param posEnd exclusive ending of position range + */ + void delete(long posStart, long posEnd); + + /** + * Checks whether a row at the position is deleted. + * @param position deleted row position + * @return whether the position is deleted + */ + boolean deleted(long position); +} diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 118ce0ac7477..37eee4a2655a 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -37,6 +37,7 @@ import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -70,7 +71,7 @@ public abstract class DeleteFilter { private final Schema requiredSchema; private final Accessor posAccessor; - private Roaring64Bitmap deleteRowPositions = null; + private PositionDeleteIndex deleteRowPositions = null; protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; @@ -192,7 +193,7 @@ protected boolean shouldKeep(T item) { return remainingRowsFilter.filter(records); } - public Roaring64Bitmap deletedRowPositions() { + public PositionDeleteIndex deletedRowPositions() { if (posDeletes.isEmpty()) { return null; } diff --git a/spark/v2.4/build.gradle b/spark/v2.4/build.gradle index 0789f12fcc5b..22f4abb03f12 100644 --- a/spark/v2.4/build.gradle +++ b/spark/v2.4/build.gradle @@ -65,6 +65,7 @@ project(':iceberg-spark:iceberg-spark2') { compileOnly "org.apache.avro:avro" compileOnly("org.apache.spark:spark-hive_2.11") { exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.roaringbitmap' } implementation("org.apache.orc:orc-core::nohive") { @@ -159,6 +160,7 @@ project(':iceberg-spark:iceberg-spark-runtime') { relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow' relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch' relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' + relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' classifier null } diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle index daff1fdd4dca..2ad0fb009d88 100644 --- a/spark/v3.0/build.gradle +++ b/spark/v3.0/build.gradle @@ -64,6 +64,7 @@ project(':iceberg-spark:iceberg-spark3') { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + exclude group: 'org.roaringbitmap' } implementation("org.apache.orc:orc-core::nohive") { @@ -80,8 +81,6 @@ project(':iceberg-spark:iceberg-spark3') { exclude group: 'com.google.code.findbugs', module: 'jsr305' } - implementation "org.roaringbitmap:RoaringBitmap" - testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' } @@ -136,6 +135,7 @@ project(":iceberg-spark:iceberg-spark3-extensions") { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + exclude group: 'org.roaringbitmap' } testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') @@ -244,6 +244,7 @@ project(':iceberg-spark:iceberg-spark3-runtime') { relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' // relocate Antlr runtime and related deps to shade Iceberg specific version relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4' + relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' classifier null } diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle index 94befdf59edd..86ecc8ffe6a8 100644 --- a/spark/v3.1/build.gradle +++ b/spark/v3.1/build.gradle @@ -64,6 +64,7 @@ project(':iceberg-spark:iceberg-spark-3.1') { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + exclude group: 'org.roaringbitmap' } implementation("org.apache.orc:orc-core::nohive") { diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index a332e44e0b0d..8b0200fb99a9 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -66,6 +66,7 @@ project(':iceberg-spark:iceberg-spark-3.2') { exclude group: 'org.apache.arrow' // to make sure io.netty.buffer only comes from project(':iceberg-arrow') exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'org.roaringbitmap' } implementation("org.apache.orc:orc-core::nohive") { @@ -82,8 +83,6 @@ project(':iceberg-spark:iceberg-spark-3.2') { exclude group: 'com.google.code.findbugs', module: 'jsr305' } - implementation "org.roaringbitmap:RoaringBitmap" - testImplementation("org.apache.hadoop:hadoop-minicluster") { exclude group: 'org.apache.avro', module: 'avro' // to make sure io.netty.buffer only comes from project(':iceberg-arrow') @@ -140,6 +139,7 @@ project(":iceberg-spark:iceberg-spark-3.2-extensions") { exclude group: 'org.apache.arrow' // to make sure io.netty.buffer only comes from project(':iceberg-arrow') exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'org.roaringbitmap' } testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index bdffc394ec44..cc4858f1d61b 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -24,6 +24,7 @@ import org.apache.iceberg.arrow.vectorized.BaseBatchReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.Pair; @@ -33,7 +34,6 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.roaringbitmap.longlong.Roaring64Bitmap; /** * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized read path. The @@ -114,7 +114,7 @@ private Pair rowIdMapping(int numRows) { * @param numRows the num of rows * @return the mapping array and the new num of rows in a batch, null if no row is deleted */ - private Pair buildRowIdMapping(Roaring64Bitmap deletedRowPositions, int numRows) { + private Pair buildRowIdMapping(PositionDeleteIndex deletedRowPositions, int numRows) { if (deletedRowPositions == null) { return null; } @@ -123,7 +123,7 @@ private Pair buildRowIdMapping(Roaring64Bitmap deletedRowPositio int originalRowId = 0; int currentRowId = 0; while (originalRowId < numRows) { - if (!deletedRowPositions.contains(originalRowId + rowStartPosInBatch)) { + if (!deletedRowPositions.deleted(originalRowId + rowStartPosInBatch)) { rowIdMapping[currentRowId] = originalRowId; currentRowId++; } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 854a8f0ec499..d86b979c8612 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -31,6 +31,8 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.BitmapPositionDeleteIndex; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; @@ -60,7 +62,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.roaringbitmap.longlong.Roaring64Bitmap; import static org.apache.iceberg.types.Types.NestedField.required; import static org.mockito.Mockito.mock; @@ -183,8 +184,8 @@ public void testReadRowNumbersWithDelete() throws IOException { DeleteFilter deleteFilter = mock(DeleteFilter.class); when(deleteFilter.hasPosDeletes()).thenReturn(true); - Roaring64Bitmap deletedRowPos = new Roaring64Bitmap(); - deletedRowPos.add(98, 103); + PositionDeleteIndex deletedRowPos = new BitmapPositionDeleteIndex(); + deletedRowPos.delete(98, 103); when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA, From c94d84f74685ea24ec3083225c4c2c9e7e25b272 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Fri, 5 Nov 2021 15:54:04 -0700 Subject: [PATCH 18/21] Remove unused import. --- data/src/main/java/org/apache/iceberg/data/DeleteFilter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 37eee4a2655a..cf261720a26f 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -56,7 +56,6 @@ import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; -import org.roaringbitmap.longlong.Roaring64Bitmap; public abstract class DeleteFilter { private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L; From 9777e68c97e95670e70983521ccb70376c1b7f1f Mon Sep 17 00:00:00 2001 From: yufeigu Date: Fri, 5 Nov 2021 16:23:13 -0700 Subject: [PATCH 19/21] Remove roaringBitmap in iceberg-data. --- build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/build.gradle b/build.gradle index eb06cbe4b4d7..38388747d25b 100644 --- a/build.gradle +++ b/build.gradle @@ -253,7 +253,6 @@ project(':iceberg-data') { exclude group: 'it.unimi.dsi' exclude group: 'org.codehaus.jackson' } - implementation "org.roaringbitmap:RoaringBitmap" compileOnly "org.apache.avro:avro" From c8e2c54490c51dc260e137ac2bf40ea062021987 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Fri, 5 Nov 2021 16:28:17 -0700 Subject: [PATCH 20/21] Exclude and shade roaringbitmap in Spark3.1 --- spark/v3.1/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle index 86ecc8ffe6a8..510185a75581 100644 --- a/spark/v3.1/build.gradle +++ b/spark/v3.1/build.gradle @@ -135,6 +135,7 @@ project(":iceberg-spark:iceberg-spark-3.1-extensions") { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + exclude group: 'org.roaringbitmap' } testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') @@ -243,6 +244,7 @@ project(':iceberg-spark:iceberg-spark-3.1-runtime') { relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' // relocate Antlr runtime and related deps to shade Iceberg specific version relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4' + relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' classifier null } From 617a19cd62f9df219d97e9dcc893de602e3d1b56 Mon Sep 17 00:00:00 2001 From: yufeigu Date: Mon, 8 Nov 2021 20:24:29 -0800 Subject: [PATCH 21/21] Make BitmapPositionDeleteIndex package-private. --- .../deletes/BitmapPositionDeleteIndex.java | 4 +-- .../TestSparkParquetReadMetadataColumns.java | 30 +++++++++++++++++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java index 291d8eebdb91..e7675c357bf1 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java @@ -21,10 +21,10 @@ import org.roaringbitmap.longlong.Roaring64Bitmap; -public class BitmapPositionDeleteIndex implements PositionDeleteIndex { +class BitmapPositionDeleteIndex implements PositionDeleteIndex { private final Roaring64Bitmap roaring64Bitmap; - public BitmapPositionDeleteIndex() { + BitmapPositionDeleteIndex() { roaring64Bitmap = new Roaring64Bitmap(); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index d86b979c8612..8e42c6e3d15e 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.arrow.vector.NullCheckingForGet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -31,7 +32,6 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; -import org.apache.iceberg.deletes.BitmapPositionDeleteIndex; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -42,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.Types; @@ -184,7 +185,7 @@ public void testReadRowNumbersWithDelete() throws IOException { DeleteFilter deleteFilter = mock(DeleteFilter.class); when(deleteFilter.hasPosDeletes()).thenReturn(true); - PositionDeleteIndex deletedRowPos = new BitmapPositionDeleteIndex(); + PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex(); deletedRowPos.delete(98, 103); when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); @@ -196,6 +197,31 @@ public void testReadRowNumbersWithDelete() throws IOException { } } + private class CustomizedPositionDeleteIndex implements PositionDeleteIndex { + private final Set deleteIndex; + + private CustomizedPositionDeleteIndex() { + deleteIndex = Sets.newHashSet(); + } + + @Override + public void delete(long position) { + deleteIndex.add(position); + } + + @Override + public void delete(long posStart, long posEnd) { + for (long l = posStart; l < posEnd; l++) { + delete(l); + } + } + + @Override + public boolean deleted(long position) { + return deleteIndex.contains(position); + } + } + @Test public void testReadRowNumbersWithFilter() throws IOException { // current iceberg supports row group filter.