diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java index 10f4eb0f1acf..76b5fd55d521 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java @@ -42,7 +42,7 @@ protected BaseBatchReader(List> readers) { } @Override - public final void setRowGroupInfo( + public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { for (VectorizedArrowReader reader : readers) { if (reader != null) { diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java index 2bace1e5d53d..ad69d5e87e74 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java @@ -94,6 +94,10 @@ public VectorizedReader message( reorderedFields.add(VectorizedArrowReader.nulls()); } } + return vectorizedReader(reorderedFields); + } + + protected VectorizedReader vectorizedReader(List> reorderedFields) { return readerFactory.apply(reorderedFields); } diff --git a/build.gradle b/build.gradle index e0097e943c07..38388747d25b 100644 --- a/build.gradle +++ b/build.gradle @@ -215,6 +215,7 @@ project(':iceberg-core') { implementation "com.fasterxml.jackson.core:jackson-databind" implementation "com.fasterxml.jackson.core:jackson-core" implementation "com.github.ben-manes.caffeine:caffeine" + implementation "org.roaringbitmap:RoaringBitmap" compileOnly("org.apache.hadoop:hadoop-client") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.slf4j', module: 'slf4j-log4j12' diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java new file mode 100644 index 000000000000..e7675c357bf1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.deletes; + +import org.roaringbitmap.longlong.Roaring64Bitmap; + +class BitmapPositionDeleteIndex implements PositionDeleteIndex { + private final Roaring64Bitmap roaring64Bitmap; + + BitmapPositionDeleteIndex() { + roaring64Bitmap = new Roaring64Bitmap(); + } + + @Override + public void delete(long position) { + roaring64Bitmap.add(position); + } + + @Override + public void delete(long posStart, long posEnd) { + roaring64Bitmap.add(posStart, posEnd); + } + + @Override + public boolean deleted(long position) { + return roaring64Bitmap.contains(position); + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 62154f7d6071..41c7793c769f 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -107,6 +107,24 @@ public static Set toPositionSet(CloseableIterable posDeletes) { } } + public static PositionDeleteIndex toPositionBitmap(CharSequence dataLocation, + List> deleteFiles) { + DataFileFilter locationFilter = new DataFileFilter<>(dataLocation); + List> positions = Lists.transform(deleteFiles, deletes -> + CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row))); + return toPositionBitmap(CloseableIterable.concat(positions)); + } + + public static PositionDeleteIndex toPositionBitmap(CloseableIterable posDeletes) { + try (CloseableIterable deletes = posDeletes) { + PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(); + deletes.forEach(positionDeleteIndex::delete); + return positionDeleteIndex; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close position delete source", e); + } + } + public static CloseableIterable streamingFilter(CloseableIterable rows, Function rowToPosition, CloseableIterable posDeletes) { diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java new file mode 100644 index 000000000000..73ef397a9450 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.deletes; + +public interface PositionDeleteIndex { + /** + * Set a deleted row position. + * @param position the deleted row position + */ + void delete(long position); + + /** + * Set a range of deleted row positions. + * @param posStart inclusive beginning of position range + * @param posEnd exclusive ending of position range + */ + void delete(long posStart, long posEnd); + + /** + * Checks whether a row at the position is deleted. + * @param position deleted row position + * @return whether the position is deleted + */ + boolean deleted(long position); +} diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index e90cb8f145c7..d8d2d68ba7dc 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -23,6 +23,7 @@ import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.ContentFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -37,6 +38,15 @@ public static boolean hasDeletes(CombinedScanTask task) { return task.files().stream().anyMatch(TableScanUtil::hasDeletes); } + /** + * This is temporarily introduced since we plan to support pos-delete vectorized read first, then get to the + * equality-delete support. We will remove this method once both are supported. + */ + public static boolean hasEqDeletes(CombinedScanTask task) { + return task.files().stream().anyMatch( + t -> t.deletes().stream().anyMatch(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))); + } + public static boolean hasDeletes(FileScanTask task) { return !task.deletes().isEmpty(); } diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index c4123d554d38..cf261720a26f 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -37,6 +37,7 @@ import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -69,6 +70,8 @@ public abstract class DeleteFilter { private final Schema requiredSchema; private final Accessor posAccessor; + private PositionDeleteIndex deleteRowPositions = null; + protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; this.dataFile = task.file(); @@ -98,6 +101,10 @@ public Schema requiredSchema() { return requiredSchema; } + public boolean hasPosDeletes() { + return !posDeletes.isEmpty(); + } + Accessor posAccessor() { return posAccessor; } @@ -185,6 +192,18 @@ protected boolean shouldKeep(T item) { return remainingRowsFilter.filter(records); } + public PositionDeleteIndex deletedRowPositions() { + if (posDeletes.isEmpty()) { + return null; + } + + if (deleteRowPositions == null) { + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + deleteRowPositions = Deletes.toPositionBitmap(dataFile.path(), deletes); + } + return deleteRowPositions; + } + private CloseableIterable applyPosDeletes(CloseableIterable records) { if (posDeletes.isEmpty()) { return records; diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 69b0a572ad73..75e1657d54e6 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -75,9 +75,9 @@ public abstract class DeleteReadTests { protected String dateTableName = null; protected Table table = null; protected Table dateTable = null; - private List records = null; + protected List records = null; private List dateRecords = null; - private DataFile dataFile = null; + protected DataFile dataFile = null; @Before public void writeTestDataFile() throws IOException { @@ -298,6 +298,39 @@ public void testPositionDeletes() throws IOException { Assert.assertEquals("Table should contain expected rows", expected, actual); } + @Test + public void testMultiplePosDeleteFiles() throws IOException { + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 3L) // id = 89 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 6L) // id = 122 + ); + + posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); + StructLikeSet actual = rowSet(tableName, table, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } + @Test public void testMixedPositionAndEqualityDeletes() throws IOException { Schema dataSchema = table.schema().select("data"); @@ -411,7 +444,7 @@ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { return set; } - private static StructLikeSet rowSetWithoutIds(Table table, List recordList, int... idsToRemove) { + protected static StructLikeSet rowSetWithoutIds(Table table, List recordList, int... idsToRemove) { Set deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); recordList.stream() diff --git a/spark/v2.4/build.gradle b/spark/v2.4/build.gradle index 0789f12fcc5b..22f4abb03f12 100644 --- a/spark/v2.4/build.gradle +++ b/spark/v2.4/build.gradle @@ -65,6 +65,7 @@ project(':iceberg-spark:iceberg-spark2') { compileOnly "org.apache.avro:avro" compileOnly("org.apache.spark:spark-hive_2.11") { exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.roaringbitmap' } implementation("org.apache.orc:orc-core::nohive") { @@ -159,6 +160,7 @@ project(':iceberg-spark:iceberg-spark-runtime') { relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow' relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch' relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' + relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' classifier null } diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle index 7df4c62e5253..2ad0fb009d88 100644 --- a/spark/v3.0/build.gradle +++ b/spark/v3.0/build.gradle @@ -64,6 +64,7 @@ project(':iceberg-spark:iceberg-spark3') { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + exclude group: 'org.roaringbitmap' } implementation("org.apache.orc:orc-core::nohive") { @@ -134,6 +135,7 @@ project(":iceberg-spark:iceberg-spark3-extensions") { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + exclude group: 'org.roaringbitmap' } testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') @@ -242,6 +244,7 @@ project(':iceberg-spark:iceberg-spark3-runtime') { relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' // relocate Antlr runtime and related deps to shade Iceberg specific version relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4' + relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' classifier null } diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle index 94befdf59edd..510185a75581 100644 --- a/spark/v3.1/build.gradle +++ b/spark/v3.1/build.gradle @@ -64,6 +64,7 @@ project(':iceberg-spark:iceberg-spark-3.1') { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + exclude group: 'org.roaringbitmap' } implementation("org.apache.orc:orc-core::nohive") { @@ -134,6 +135,7 @@ project(":iceberg-spark:iceberg-spark-3.1-extensions") { compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.apache.arrow' + exclude group: 'org.roaringbitmap' } testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') @@ -242,6 +244,7 @@ project(':iceberg-spark:iceberg-spark-3.1-runtime') { relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' // relocate Antlr runtime and related deps to shade Iceberg specific version relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4' + relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' classifier null } diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index 2a2ff62f8d15..8b0200fb99a9 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -66,6 +66,7 @@ project(':iceberg-spark:iceberg-spark-3.2') { exclude group: 'org.apache.arrow' // to make sure io.netty.buffer only comes from project(':iceberg-arrow') exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'org.roaringbitmap' } implementation("org.apache.orc:orc-core::nohive") { @@ -138,6 +139,7 @@ project(":iceberg-spark:iceberg-spark-3.2-extensions") { exclude group: 'org.apache.arrow' // to make sure io.netty.buffer only comes from project(':iceberg-arrow') exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'org.roaringbitmap' } testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') @@ -244,6 +246,7 @@ project(':iceberg-spark:iceberg-spark-3.2-runtime') { relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' // relocate Antlr runtime and related deps to shade Iceberg specific version relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4' + relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' classifier null } diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 2be6a8c538d3..ca1e92fdb7ae 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; @@ -187,4 +188,8 @@ protected void withTableProperties(Map props, Action action) { restoreProperties.commit(); } } + + protected FileFormat fileFormat() { + throw new UnsupportedOperationException("Unsupported file format"); + } } diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java new file mode 100644 index 000000000000..d5a6db3b385c --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +public abstract class IcebergSourceDeleteBenchmark extends IcebergSourceBenchmark { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class); + private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024; + + protected static final int NUM_FILES = 1; + protected static final int NUM_ROWS = 10 * 1000 * 1000; + + @Setup + public void setupBenchmark() throws IOException { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readIcebergVectorized() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true"); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + protected abstract void appendData() throws IOException; + + protected void writeData(int fileNum) { + Dataset df = spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + } + + @Override + protected Table initTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + properties.put(TableProperties.FORMAT_VERSION, "2"); + return tables.create(schema, partitionSpec, properties, newTableLocation()); + } + + @Override + protected Configuration initHadoopConf() { + return new Configuration(); + } + + protected void writePosDeletes(CharSequence path, long numRows, double percentage) throws IOException { + writePosDeletes(path, numRows, percentage, 1); + } + + protected void writePosDeletes(CharSequence path, long numRows, double percentage, + int numDeleteFile) throws IOException { + writePosDeletesWithNoise(path, numRows, percentage, 0, numDeleteFile); + } + + protected void writePosDeletesWithNoise(CharSequence path, long numRows, double percentage, int numNoise, + int numDeleteFile) throws IOException { + Set deletedPos = Sets.newHashSet(); + while (deletedPos.size() < numRows * percentage) { + deletedPos.add(ThreadLocalRandom.current().nextLong(numRows)); + } + LOG.info("pos delete row count: {}, num of delete files: {}", deletedPos.size(), numDeleteFile); + + int partitionSize = (int) (numRows * percentage) / numDeleteFile; + Iterable> sets = Iterables.partition(deletedPos, partitionSize); + for (List item : sets) { + writePosDeletes(path, item, numNoise); + } + } + + protected void writePosDeletes(CharSequence path, List deletedPos, int numNoise) throws IOException { + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .build(); + + ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table().io(), + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionSpec unpartitionedSpec = table().specs().get(0); + + PositionDelete positionDelete = PositionDelete.create(); + try (ClusteredPositionDeleteWriter closeableWriter = writer) { + for (Long pos : deletedPos) { + positionDelete.set(path, pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + for (int i = 0; i < numNoise; i++) { + positionDelete.set(noisePath(path), pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + } + } + } + + RowDelta rowDelta = table().newRowDelta(); + writer.result().deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.validateDeletedFiles().commit(); + } + + private OutputFileFactory newFileFactory() { + return OutputFileFactory.builderFor(table(), 1, 1) + .format(fileFormat()) + .build(); + } + + private CharSequence noisePath(CharSequence path) { + // assume the data file name would be something like "00000-0-30da64e0-56b5-4743-a11b-3188a1695bf7-00001.parquet" + // so the dataFileSuffixLen is the UUID string length + length of "-00001.parquet", which is 36 + 14 = 60. It's OK + // to be not accurate here. + int dataFileSuffixLen = 60; + UUID uuid = UUID.randomUUID(); + if (path.length() > dataFileSuffixLen) { + return path.subSequence(0, dataFileSuffixLen) + uuid.toString(); + } else { + return uuid.toString(); + } + } +} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java new file mode 100644 index 000000000000..234c6c5666ca --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for + * Iceberg. + *

+ * This class uses a dataset with a flat schema. + * To run this benchmark for spark-3.2: + * + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetDeleteBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) + private double percentDeleteRow; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + if (percentDeleteRow > 0) { + // add pos-deletes + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, percentDeleteRow); + } + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java new file mode 100644 index 000000000000..fb3933186c4f --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for + * Iceberg. + *

+ * This class uses a dataset with a flat schema. + * To run this benchmark for spark-3.2: + * + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetMultiDeleteFileBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-multi-delete-file-benchmark-result.txt + * + */ +public class IcebergSourceParquetMultiDeleteFileBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"1", "2", "5", "10"}) + private int numDeleteFile; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, 0.25, numDeleteFile); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java new file mode 100644 index 000000000000..a06cadb1e525 --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for + * Iceberg. + *

+ * This class uses a dataset with a flat schema. + * To run this benchmark for spark-3.2: + * + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetWithUnrelatedDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-with-unrelated-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetWithUnrelatedDeleteBenchmark extends IcebergSourceDeleteBenchmark { + private static final double PERCENT_DELETE_ROW = 0.05; + @Param({"0", "0.05", "0.25", "0.5"}) + private double percentUnrelatedDeletes; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletesWithNoise(file.path(), NUM_ROWS, PERCENT_DELETE_ROW, + (int) (percentUnrelatedDeletes / PERCENT_DELETE_ROW), 1); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java new file mode 100644 index 000000000000..7804c02f9042 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.arrow.vectorized.VectorHolder; +import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.unsafe.types.UTF8String; + +public class ColumnVectorWithFilter extends IcebergArrowColumnVector { + private final int[] rowIdMapping; + + public ColumnVectorWithFilter(VectorHolder holder, int[] rowIdMapping) { + super(holder); + this.rowIdMapping = rowIdMapping; + } + + @Override + public boolean isNullAt(int rowId) { + return nullabilityHolder().isNullAt(rowIdMapping[rowId]) == 1; + } + + @Override + public boolean getBoolean(int rowId) { + return accessor().getBoolean(rowIdMapping[rowId]); + } + + @Override + public int getInt(int rowId) { + return accessor().getInt(rowIdMapping[rowId]); + } + + @Override + public long getLong(int rowId) { + return accessor().getLong(rowIdMapping[rowId]); + } + + @Override + public float getFloat(int rowId) { + return accessor().getFloat(rowIdMapping[rowId]); + } + + @Override + public double getDouble(int rowId) { + return accessor().getDouble(rowIdMapping[rowId]); + } + + @Override + public ColumnarArray getArray(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getArray(rowIdMapping[rowId]); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getDecimal(rowIdMapping[rowId], precision, scale); + } + + @Override + public UTF8String getUTF8String(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getUTF8String(rowIdMapping[rowId]); + } + + @Override + public byte[] getBinary(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getBinary(rowIdMapping[rowId]); + } + + public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) { + return holder.isDummy() ? + new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) : + new ColumnVectorWithFilter(holder, rowIdMapping); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f71a6968099c..cc4858f1d61b 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -20,10 +20,18 @@ package org.apache.iceberg.spark.data.vectorized; import java.util.List; +import java.util.Map; import org.apache.iceberg.arrow.vectorized.BaseBatchReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -33,11 +41,24 @@ * {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { + private DeleteFilter deletes = null; + private long rowStartPosInBatch = 0; public ColumnarBatchReader(List> readers) { super(readers); } + @Override + public void setRowGroupInfo(PageReadStore pageStore, Map metaData, + long rowPosition) { + super.setRowGroupInfo(pageStore, metaData, rowPosition); + this.rowStartPosInBatch = rowPosition; + } + + public void setDeleteFilter(DeleteFilter deleteFilter) { + this.deletes = deleteFilter; + } + @Override public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); @@ -47,6 +68,8 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { closeVectors(); } + Pair rowIdMapping = rowIdMapping(numRowsToRead); + for (int i = 0; i < readers.length; i += 1) { vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); int numRowsInVector = vectorHolders[i].numValues(); @@ -54,11 +77,64 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { numRowsInVector == numRowsToRead, "Number of rows in the vector %s didn't match expected %s ", numRowsInVector, numRowsToRead); - arrowColumnVectors[i] = - IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + + if (rowIdMapping == null) { + arrowColumnVectors[i] = IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + } else { + int[] rowIdMap = rowIdMapping.first(); + Integer numRows = rowIdMapping.second(); + arrowColumnVectors[i] = ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows); + } } + + rowStartPosInBatch += numRowsToRead; ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors); - batch.setNumRows(numRowsToRead); + + if (rowIdMapping == null) { + batch.setNumRows(numRowsToRead); + } else { + Integer numRows = rowIdMapping.second(); + batch.setNumRows(numRows); + } return batch; } + + private Pair rowIdMapping(int numRows) { + if (deletes != null && deletes.hasPosDeletes()) { + return buildRowIdMapping(deletes.deletedRowPositions(), numRows); + } else { + return null; + } + } + + /** + * Build a row id mapping inside a batch, which skips delete rows. For example, if the 1st and 3rd rows are deleted in + * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new num of rows is 3. + * @param deletedRowPositions a set of deleted row positions + * @param numRows the num of rows + * @return the mapping array and the new num of rows in a batch, null if no row is deleted + */ + private Pair buildRowIdMapping(PositionDeleteIndex deletedRowPositions, int numRows) { + if (deletedRowPositions == null) { + return null; + } + + int[] rowIdMapping = new int[numRows]; + int originalRowId = 0; + int currentRowId = 0; + while (originalRowId < numRows) { + if (!deletedRowPositions.deleted(originalRowId + rowStartPosInBatch)) { + rowIdMapping[currentRowId] = originalRowId; + currentRowId++; + } + originalRowId++; + } + + if (currentRowId == numRows) { + // there is no delete in this batch + return null; + } else { + return Pair.of(rowIdMapping, currentRowId); + } + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java index 514eec84fe82..c5b7853d5353 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java @@ -48,6 +48,14 @@ public IcebergArrowColumnVector(VectorHolder holder) { this.accessor = ArrowVectorAccessors.getVectorAccessor(holder); } + protected ArrowVectorAccessor accessor() { + return accessor; + } + + protected NullabilityHolder nullabilityHolder() { + return nullabilityHolder; + } + @Override public void close() { accessor.close(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index b2d582352d74..020b35f52844 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -19,12 +19,17 @@ package org.apache.iceberg.spark.data.vectorized; +import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.catalyst.InternalRow; public class VectorizedSparkParquetReaders { @@ -49,4 +54,39 @@ public static ColumnarBatchReader buildReader( expectedSchema, fileSchema, setArrowValidityVector, idToConstant, ColumnarBatchReader::new)); } + + public static ColumnarBatchReader buildReader(Schema expectedSchema, + MessageType fileSchema, + boolean setArrowValidityVector, + Map idToConstant, + DeleteFilter deleteFilter) { + return (ColumnarBatchReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new ReaderBuilder( + expectedSchema, fileSchema, setArrowValidityVector, + idToConstant, ColumnarBatchReader::new, deleteFilter)); + } + + private static class ReaderBuilder extends VectorizedReaderBuilder { + private final DeleteFilter deleteFilter; + + ReaderBuilder(Schema expectedSchema, + MessageType parquetSchema, + boolean setArrowValidityVector, + Map idToConstant, + Function>, VectorizedReader> readerFactory, + DeleteFilter deleteFilter) { + super(expectedSchema, parquetSchema, setArrowValidityVector, idToConstant, readerFactory); + this.deleteFilter = deleteFilter; + } + + @Override + protected VectorizedReader vectorizedReader(List> reorderedFields) { + VectorizedReader reader = super.vectorizedReader(reorderedFields); + if (deleteFilter != null) { + ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter); + } + return reader; + } + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java index b58745c7a00d..f0664c7e8e29 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java @@ -91,6 +91,10 @@ abstract class BaseDataReader implements Closeable { this.currentIterator = CloseableIterator.empty(); } + protected Table table() { + return table; + } + public boolean next() throws IOException { try { while (true) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index e4bd3ceba6ce..5f05c55789ed 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -28,8 +28,10 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; @@ -38,10 +40,12 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; class BatchDataReader extends BaseDataReader { @@ -71,11 +75,14 @@ CloseableIterator open(FileScanTask task) { InputFile location = getInputFile(task); Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); if (task.file().format() == FileFormat.PARQUET) { + SparkDeleteFilter deleteFilter = deleteFilter(task); + Parquet.ReadBuilder builder = Parquet.read(location) .project(expectedSchema) .split(task.start(), task.length()) .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema, - fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant)) + fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant, + deleteFilter)) .recordsPerBatch(batchSize) .filter(task.residual()) .caseSensitive(caseSensitive) @@ -114,4 +121,27 @@ CloseableIterator open(FileScanTask task) { } return iter.iterator(); } + + private SparkDeleteFilter deleteFilter(FileScanTask task) { + return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task, table().schema(), expectedSchema); + } + + private class SparkDeleteFilter extends DeleteFilter { + private final InternalRowWrapper asStructLike; + + SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + super(task, tableSchema, requestedSchema); + this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); + } + + @Override + protected StructLike asStructLike(InternalRow row) { + return asStructLike.wrap(row); + } + + @Override + protected InputFile getInputFile(String location) { + return BatchDataReader.this.getInputFile(location); + } + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index bb7ac9448bc2..e1420ed0911c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -175,10 +175,15 @@ public PartitionReaderFactory createReaderFactory() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); + boolean hasNoEqDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasEqDeletes); + boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks); - boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + boolean batchReadOrc = hasNoDeleteFiles && allOrcFileScanTasks; + + boolean batchReadParquet = hasNoEqDeleteFiles && allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives; + + boolean readUsingBatch = batchReadsEnabled && (batchReadOrc || batchReadParquet); int batchSize = readUsingBatch ? batchSize(allParquetFileScanTasks, allOrcFileScanTasks) : 0; diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index b68db024aab3..8e42c6e3d15e 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -24,12 +24,15 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.arrow.vector.NullCheckingForGet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; @@ -38,6 +41,8 @@ import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.Types; @@ -60,6 +65,8 @@ import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @RunWith(Parameterized.class) public class TestSparkParquetReadMetadataColumns { @@ -165,6 +172,56 @@ public void testReadRowNumbers() throws IOException { readAndValidate(null, null, null, EXPECTED_ROWS); } + @Test + public void testReadRowNumbersWithDelete() throws IOException { + if (vectorized) { + List expectedRowsAfterDelete = new ArrayList<>(EXPECTED_ROWS); + // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100, 200) + for (int i = 1; i <= 5; i++) { + expectedRowsAfterDelete.remove(98); + } + + Parquet.ReadBuilder builder = Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA); + + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex(); + deletedRowPos.delete(98, 103); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + + builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA, + fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, Maps.newHashMap(), deleteFilter)); + builder.recordsPerBatch(RECORDS_PER_BATCH); + + validate(expectedRowsAfterDelete, builder); + } + } + + private class CustomizedPositionDeleteIndex implements PositionDeleteIndex { + private final Set deleteIndex; + + private CustomizedPositionDeleteIndex() { + deleteIndex = Sets.newHashSet(); + } + + @Override + public void delete(long position) { + deleteIndex.add(position); + } + + @Override + public void delete(long posStart, long posEnd) { + for (long l = posStart; l < posEnd; l++) { + delete(l); + } + } + + @Override + public boolean deleted(long position) { + return deleteIndex.contains(position); + } + } + @Test public void testReadRowNumbersWithFilter() throws IOException { // current iceberg supports row group filter. @@ -212,6 +269,10 @@ private void readAndValidate(Expression filter, Long splitStart, Long splitLengt builder = builder.split(splitStart, splitLength); } + validate(expected, builder); + } + + private void validate(List expected, Parquet.ReadBuilder builder) throws IOException { try (CloseableIterable reader = vectorized ? batchesToRows(builder.build()) : builder.build()) { final Iterator actualRows = reader.iterator(); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index b7398b7befb2..e1c6355b8e75 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -49,6 +49,8 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.Dataset; @@ -59,14 +61,30 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +@RunWith(Parameterized.class) public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + private final boolean vectorized; + + public TestSparkReaderDeletes(boolean vectorized) { + this.vectorized = vectorized; + } + + @Parameterized.Parameters(name = "vectorized = {0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {false}, + new Object[] {true} + }; + } @BeforeClass public static void startMetastoreAndSpark() { @@ -106,7 +124,12 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - + if (vectorized) { + table.updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") + .set(TableProperties.PARQUET_BATCH_SIZE, "4") // split 7 records to two batches to cover more code paths + .commit(); + } return table; } @@ -215,4 +238,28 @@ public void testReadEqualityDeleteRows() throws IOException { Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); } + + @Test + public void testPosDeletesAllRowsInBatch() throws IOException { + // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all deleted. + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 1L), // id = 43 + Pair.of(dataFile.path(), 2L), // id = 61 + Pair.of(dataFile.path(), 3L) // id = 89 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89); + StructLikeSet actual = rowSet(tableName, table, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } } diff --git a/versions.props b/versions.props index fa6505ad4b38..0becbf59f600 100644 --- a/versions.props +++ b/versions.props @@ -16,6 +16,7 @@ com.google.guava:guava = 28.0-jre com.github.ben-manes.caffeine:caffeine = 2.8.4 org.apache.arrow:arrow-vector = 6.0.0 org.apache.arrow:arrow-memory-netty = 6.0.0 +org.roaringbitmap:RoaringBitmap = 0.9.0 io.netty:netty-buffer = 4.1.63.Final com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 com.aliyun.oss:aliyun-sdk-oss = 3.10.2