diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 19bcdd672157..b758e9b2c09f 100644 --- a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; @@ -196,4 +197,8 @@ protected void withTableProperties(Map props, Action action) { restoreProperties.commit(); } } + + protected FileFormat fileFormat() { + throw new UnsupportedOperationException("Unsupported file format"); + } } diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java new file mode 100644 index 000000000000..9a1aa721ca2c --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class IcebergSourceDeleteBenchmark extends IcebergSourceBenchmark { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class); + private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024; + + protected static final int NUM_FILES = 1; + protected static final int NUM_ROWS = 10 * 1000 * 1000; + + @Setup + public void setupBenchmark() throws IOException { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties( + tableProperties, + () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readIcebergVectorized() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true"); + withTableProperties( + tableProperties, + () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + protected abstract void appendData() throws IOException; + + protected void writeData(int fileNum) { + Dataset df = + spark() + .range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + } + + @Override + protected Table initTable() { + Schema schema = + new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + properties.put(TableProperties.FORMAT_VERSION, "2"); + return tables.create(schema, partitionSpec, properties, newTableLocation()); + } + + @Override + protected Configuration initHadoopConf() { + return new Configuration(); + } + + protected void writePosDeletes(CharSequence path, long numRows, double percentage) + throws IOException { + writePosDeletes(path, numRows, percentage, 1); + } + + protected void writePosDeletes( + CharSequence path, long numRows, double percentage, int numDeleteFile) throws IOException { + writePosDeletesWithNoise(path, numRows, percentage, 0, numDeleteFile); + } + + protected void writePosDeletesWithNoise( + CharSequence path, long numRows, double percentage, int numNoise, int numDeleteFile) + throws IOException { + Set deletedPos = Sets.newHashSet(); + while (deletedPos.size() < numRows * percentage) { + deletedPos.add(ThreadLocalRandom.current().nextLong(numRows)); + } + LOG.info("pos delete row count: {}, num of delete files: {}", deletedPos.size(), numDeleteFile); + + int partitionSize = (int) (numRows * percentage) / numDeleteFile; + Iterable> sets = Iterables.partition(deletedPos, partitionSize); + for (List item : sets) { + writePosDeletes(path, item, numNoise); + } + } + + protected void writePosDeletes(CharSequence path, List deletedPos, int numNoise) + throws IOException { + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = + SparkFileWriterFactory.builderFor(table()).dataFileFormat(fileFormat()).build(); + + ClusteredPositionDeleteWriter writer = + new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table().io(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionSpec unpartitionedSpec = table().specs().get(0); + + PositionDelete positionDelete = PositionDelete.create(); + try (ClusteredPositionDeleteWriter closeableWriter = writer) { + for (Long pos : deletedPos) { + positionDelete.set(path, pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + for (int i = 0; i < numNoise; i++) { + positionDelete.set(noisePath(path), pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + } + } + } + + RowDelta rowDelta = table().newRowDelta(); + writer.result().deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.validateDeletedFiles().commit(); + } + + private OutputFileFactory newFileFactory() { + return OutputFileFactory.builderFor(table(), 1, 1).format(fileFormat()).build(); + } + + private CharSequence noisePath(CharSequence path) { + // assume the data file name would be something like + // "00000-0-30da64e0-56b5-4743-a11b-3188a1695bf7-00001.parquet" + // so the dataFileSuffixLen is the UUID string length + length of "-00001.parquet", which is 36 + // + 14 = 60. It's OK + // to be not accurate here. + int dataFileSuffixLen = 60; + UUID uuid = UUID.randomUUID(); + if (path.length() > dataFileSuffixLen) { + return path.subSequence(0, dataFileSuffixLen) + uuid.toString(); + } else { + return uuid.toString(); + } + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java new file mode 100644 index 000000000000..ab3bf892b0ab --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the + * Spark data source for Iceberg. + * + *

This class uses a dataset with a flat schema. To run this benchmark for spark-3.2: + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetDeleteBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) + private double percentDeleteRow; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + if (percentDeleteRow > 0) { + // add pos-deletes + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, percentDeleteRow); + } + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java new file mode 100644 index 000000000000..c4a15ee4cbaf --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the + * Spark data source for Iceberg. + * + *

This class uses a dataset with a flat schema. To run this benchmark for spark-3.2: + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetMultiDeleteFileBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-multi-delete-file-benchmark-result.txt + * + */ +public class IcebergSourceParquetMultiDeleteFileBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"1", "2", "5", "10"}) + private int numDeleteFile; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, 0.25, numDeleteFile); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java new file mode 100644 index 000000000000..9d7271945a46 --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the + * Spark data source for Iceberg. + * + *

This class uses a dataset with a flat schema. To run this benchmark for spark-3.2: + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetWithUnrelatedDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-with-unrelated-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetWithUnrelatedDeleteBenchmark extends IcebergSourceDeleteBenchmark { + private static final double PERCENT_DELETE_ROW = 0.05; + + @Param({"0", "0.05", "0.25", "0.5"}) + private double percentUnrelatedDeletes; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletesWithNoise( + file.path(), + NUM_ROWS, + PERCENT_DELETE_ROW, + (int) (percentUnrelatedDeletes / PERCENT_DELETE_ROW), + 1); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java new file mode 100644 index 000000000000..2372d6fddd4a --- /dev/null +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.arrow.vectorized.VectorHolder; +import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.unsafe.types.UTF8String; + +public class ColumnVectorWithFilter extends IcebergArrowColumnVector { + private final int[] rowIdMapping; + + public ColumnVectorWithFilter(VectorHolder holder, int[] rowIdMapping) { + super(holder); + this.rowIdMapping = rowIdMapping; + } + + @Override + public boolean isNullAt(int rowId) { + return nullabilityHolder().isNullAt(rowIdMapping[rowId]) == 1; + } + + @Override + public boolean getBoolean(int rowId) { + return accessor().getBoolean(rowIdMapping[rowId]); + } + + @Override + public int getInt(int rowId) { + return accessor().getInt(rowIdMapping[rowId]); + } + + @Override + public long getLong(int rowId) { + return accessor().getLong(rowIdMapping[rowId]); + } + + @Override + public float getFloat(int rowId) { + return accessor().getFloat(rowIdMapping[rowId]); + } + + @Override + public double getDouble(int rowId) { + return accessor().getDouble(rowIdMapping[rowId]); + } + + @Override + public ColumnarArray getArray(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getArray(rowIdMapping[rowId]); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getDecimal(rowIdMapping[rowId], precision, scale); + } + + @Override + public UTF8String getUTF8String(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getUTF8String(rowIdMapping[rowId]); + } + + @Override + public byte[] getBinary(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getBinary(rowIdMapping[rowId]); + } + + public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) { + return holder.isDummy() + ? new ConstantColumnVector( + Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) + : new ColumnVectorWithFilter(holder, rowIdMapping); + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f761b2eb551b..72b1345fa867 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -19,10 +19,18 @@ package org.apache.iceberg.spark.data.vectorized; import java.util.List; +import java.util.Map; import org.apache.iceberg.arrow.vectorized.BaseBatchReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -32,11 +40,24 @@ * populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { + private DeleteFilter deletes = null; + private long rowStartPosInBatch = 0; public ColumnarBatchReader(List> readers) { super(readers); } + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData, long rowPosition) { + super.setRowGroupInfo(pageStore, metaData, rowPosition); + this.rowStartPosInBatch = rowPosition; + } + + public void setDeleteFilter(DeleteFilter deleteFilter) { + this.deletes = deleteFilter; + } + @Override public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { Preconditions.checkArgument( @@ -47,6 +68,8 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { closeVectors(); } + Pair rowIdMapping = rowIdMapping(numRowsToRead); + for (int i = 0; i < readers.length; i += 1) { vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); int numRowsInVector = vectorHolders[i].numValues(); @@ -55,10 +78,69 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { "Number of rows in the vector %s didn't match expected %s ", numRowsInVector, numRowsToRead); - arrowColumnVectors[i] = IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + + if (rowIdMapping == null) { + arrowColumnVectors[i] = + IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + } else { + int[] rowIdMap = rowIdMapping.first(); + Integer numRows = rowIdMapping.second(); + arrowColumnVectors[i] = + ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows); + } } + + rowStartPosInBatch += numRowsToRead; ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors); - batch.setNumRows(numRowsToRead); + + if (rowIdMapping == null) { + batch.setNumRows(numRowsToRead); + } else { + Integer numRows = rowIdMapping.second(); + batch.setNumRows(numRows); + } return batch; } + + private Pair rowIdMapping(int numRows) { + if (deletes != null && deletes.hasPosDeletes()) { + return buildRowIdMapping(deletes.deletedRowPositions(), numRows); + } else { + return null; + } + } + + /** + * Build a row id mapping inside a batch, which skips delete rows. For example, if the 1st and 3rd + * rows are deleted in a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new + * num of rows is 3. + * + * @param deletedRowPositions a set of deleted row positions + * @param numRows the num of rows + * @return the mapping array and the new num of rows in a batch, null if no row is deleted + */ + private Pair buildRowIdMapping( + PositionDeleteIndex deletedRowPositions, int numRows) { + if (deletedRowPositions == null) { + return null; + } + + int[] rowIdMapping = new int[numRows]; + int originalRowId = 0; + int currentRowId = 0; + while (originalRowId < numRows) { + if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { + rowIdMapping[currentRowId] = originalRowId; + currentRowId++; + } + originalRowId++; + } + + if (currentRowId == numRows) { + // there is no delete in this batch + return null; + } else { + return Pair.of(rowIdMapping, currentRowId); + } + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java index 33c1a5284818..26a12c239f86 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java @@ -48,6 +48,14 @@ public IcebergArrowColumnVector(VectorHolder holder) { this.accessor = ArrowVectorAccessors.getVectorAccessor(holder); } + protected ArrowVectorAccessor accessor() { + return accessor; + } + + protected NullabilityHolder nullabilityHolder() { + return nullabilityHolder; + } + @Override public void close() { accessor.close(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index bbb63e077bc6..bf85bdb7ed05 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -18,12 +18,17 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.catalyst.InternalRow; public class VectorizedSparkParquetReaders { @@ -50,4 +55,47 @@ public static ColumnarBatchReader buildReader( idToConstant, ColumnarBatchReader::new)); } + + public static ColumnarBatchReader buildReader( + Schema expectedSchema, + MessageType fileSchema, + boolean setArrowValidityVector, + Map idToConstant, + DeleteFilter deleteFilter) { + return (ColumnarBatchReader) + TypeWithSchemaVisitor.visit( + expectedSchema.asStruct(), + fileSchema, + new ReaderBuilder( + expectedSchema, + fileSchema, + setArrowValidityVector, + idToConstant, + ColumnarBatchReader::new, + deleteFilter)); + } + + private static class ReaderBuilder extends VectorizedReaderBuilder { + private final DeleteFilter deleteFilter; + + ReaderBuilder( + Schema expectedSchema, + MessageType parquetSchema, + boolean setArrowValidityVector, + Map idToConstant, + Function>, VectorizedReader> readerFactory, + DeleteFilter deleteFilter) { + super(expectedSchema, parquetSchema, setArrowValidityVector, idToConstant, readerFactory); + this.deleteFilter = deleteFilter; + } + + @Override + protected VectorizedReader vectorizedReader(List> reorderedFields) { + VectorizedReader reader = super.vectorizedReader(reorderedFields); + if (deleteFilter != null) { + ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter); + } + return reader; + } + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java index 2cab8ee238e0..f3ddd50eef4b 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java @@ -96,6 +96,10 @@ abstract class BaseDataReader implements Closeable { this.currentIterator = CloseableIterator.empty(); } + protected Table table() { + return table; + } + public boolean next() throws IOException { try { while (true) { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index d620faa979f6..35d0a9cbac23 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -27,8 +27,10 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; @@ -37,10 +39,12 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; class BatchDataReader extends BaseDataReader { @@ -71,6 +75,8 @@ CloseableIterator open(FileScanTask task) { InputFile location = getInputFile(task); Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); if (task.file().format() == FileFormat.PARQUET) { + SparkDeleteFilter deleteFilter = deleteFilter(task); + Parquet.ReadBuilder builder = Parquet.read(location) .project(expectedSchema) @@ -81,7 +87,8 @@ CloseableIterator open(FileScanTask task) { expectedSchema, fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, - idToConstant)) + idToConstant, + deleteFilter)) .recordsPerBatch(batchSize) .filter(task.residual()) .caseSensitive(caseSensitive) @@ -127,4 +134,29 @@ CloseableIterator open(FileScanTask task) { } return iter.iterator(); } + + private SparkDeleteFilter deleteFilter(FileScanTask task) { + return task.deletes().isEmpty() + ? null + : new SparkDeleteFilter(task, table().schema(), expectedSchema); + } + + private class SparkDeleteFilter extends DeleteFilter { + private final InternalRowWrapper asStructLike; + + SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema); + this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); + } + + @Override + protected StructLike asStructLike(InternalRow row) { + return asStructLike.wrap(row); + } + + @Override + protected InputFile getInputFile(String location) { + return BatchDataReader.this.getInputFile(location); + } + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 3b3d62d96226..ae73046531ba 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -191,13 +191,16 @@ public PartitionReaderFactory createReaderFactory() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); + boolean hasNoEqDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasEqDeletes); + boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks); - boolean readUsingBatch = - batchReadsEnabled - && hasNoDeleteFiles - && (allOrcFileScanTasks - || (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + boolean batchReadOrc = hasNoDeleteFiles && allOrcFileScanTasks; + + boolean batchReadParquet = + hasNoEqDeleteFiles && allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives; + + boolean readUsingBatch = batchReadsEnabled && (batchReadOrc || batchReadParquet); int batchSize = readUsingBatch ? batchSize(allParquetFileScanTasks, allOrcFileScanTasks) : 0; diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 929d08f2cdb6..653ebcb01964 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -19,17 +19,22 @@ package org.apache.iceberg.spark.data; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.arrow.vector.NullCheckingForGet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; @@ -38,6 +43,8 @@ import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.Types; @@ -160,6 +167,69 @@ public void testReadRowNumbers() throws IOException { readAndValidate(null, null, null, EXPECTED_ROWS); } + @Test + public void testReadRowNumbersWithDelete() throws IOException { + if (vectorized) { + List expectedRowsAfterDelete = Lists.newArrayList(EXPECTED_ROWS); + // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and + // [100, 200) + for (int i = 1; i <= 5; i++) { + expectedRowsAfterDelete.remove(98); + } + + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA); + + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex(); + deletedRowPos.delete(98, 103); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + + builder.createBatchedReaderFunc( + fileSchema -> + VectorizedSparkParquetReaders.buildReader( + PROJECTION_SCHEMA, + fileSchema, + NullCheckingForGet.NULL_CHECKING_ENABLED, + Maps.newHashMap(), + deleteFilter)); + builder.recordsPerBatch(RECORDS_PER_BATCH); + + validate(expectedRowsAfterDelete, builder); + } + } + + private class CustomizedPositionDeleteIndex implements PositionDeleteIndex { + private final Set deleteIndex; + + private CustomizedPositionDeleteIndex() { + deleteIndex = Sets.newHashSet(); + } + + @Override + public void delete(long position) { + deleteIndex.add(position); + } + + @Override + public void delete(long posStart, long posEnd) { + for (long l = posStart; l < posEnd; l++) { + delete(l); + } + } + + @Override + public boolean isDeleted(long position) { + return deleteIndex.contains(position); + } + + @Override + public boolean isEmpty() { + return deleteIndex.isEmpty(); + } + } + @Test public void testReadRowNumbersWithFilter() throws IOException { // current iceberg supports row group filter. @@ -216,6 +286,11 @@ private void readAndValidate( builder = builder.split(splitStart, splitLength); } + validate(expected, builder); + } + + private void validate(List expected, Parquet.ReadBuilder builder) + throws IOException { try (CloseableIterable reader = vectorized ? batchesToRows(builder.build()) : builder.build()) { final Iterator actualRows = reader.iterator(); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 462f34530725..575555d745c2 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -50,6 +50,8 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.Dataset; @@ -60,12 +62,25 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + private final boolean vectorized; + + public TestSparkReaderDeletes(boolean vectorized) { + this.vectorized = vectorized; + } + + @Parameterized.Parameters(name = "vectorized = {0}") + public static Object[][] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } @BeforeClass public static void startMetastoreAndSpark() { @@ -108,7 +123,15 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - + if (vectorized) { + table + .updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") + .set( + TableProperties.PARQUET_BATCH_SIZE, + "4") // split 7 records to two batches to cover more code paths + .commit(); + } return table; } @@ -242,4 +265,32 @@ public void testReadEqualityDeleteRows() throws IOException { Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); } + + @Test + public void testPosDeletesAllRowsInBatch() throws IOException { + // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all + // deleted. + List> deletes = + Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 1L), // id = 43 + Pair.of(dataFile.path(), 2L), // id = 61 + Pair.of(dataFile.path(), 3L) // id = 89 + ); + + Pair posDeletes = + FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89); + StructLikeSet actual = rowSet(tableName, table, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } }