diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java index 9a1aa721ca2c..d970175d651f 100644 --- a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -39,15 +39,18 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.ClusteredEqualityDeleteWriter; import org.apache.iceberg.io.ClusteredPositionDeleteWriter; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.TearDown; @@ -198,6 +201,57 @@ protected void writePosDeletes(CharSequence path, List deletedPos, int num rowDelta.validateDeletedFiles().commit(); } + protected void writeEqDeletes(long numRows, double percentage) throws IOException { + Set deletedValues = Sets.newHashSet(); + while (deletedValues.size() < numRows * percentage) { + deletedValues.add(ThreadLocalRandom.current().nextLong(numRows)); + } + + List rows = Lists.newArrayList(); + for (Long value : deletedValues) { + GenericInternalRow genericInternalRow = new GenericInternalRow(7); + genericInternalRow.setLong(0, value); + genericInternalRow.setInt(1, (int) (value % Integer.MAX_VALUE)); + genericInternalRow.setFloat(2, (float) value); + genericInternalRow.setNullAt(3); + genericInternalRow.setNullAt(4); + genericInternalRow.setNullAt(5); + genericInternalRow.setNullAt(6); + rows.add(genericInternalRow); + } + LOG.info("Num of equality deleted rows: {}", rows.size()); + + writeEqDeletes(rows); + } + + private void writeEqDeletes(List rows) throws IOException { + int equalityFieldId = table().schema().findField("longCol").fieldId(); + + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = + SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .equalityDeleteRowSchema(table().schema()) + .equalityFieldIds(new int[] {equalityFieldId}) + .build(); + + ClusteredEqualityDeleteWriter writer = + new ClusteredEqualityDeleteWriter<>( + writerFactory, fileFactory, table().io(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionSpec unpartitionedSpec = table().specs().get(0); + try (ClusteredEqualityDeleteWriter closeableWriter = writer) { + for (InternalRow row : rows) { + closeableWriter.write(row, unpartitionedSpec, null); + } + } + + RowDelta rowDelta = table().newRowDelta(); + LOG.info("Num of Delete File: {}", writer.result().deleteFiles().size()); + writer.result().deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.validateDeletedFiles().commit(); + } + private OutputFileFactory newFileFactory() { return OutputFileFactory.builderFor(table(), 1, 1).format(fileFormat()).build(); } diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java new file mode 100644 index 000000000000..20a47d327e6d --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with equality delete in + * the Spark data source for Iceberg. + * + *

This class uses a dataset with a flat schema. To run this benchmark for spark-3.2: + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetEqDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-eq-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetEqDeleteBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) + private double percentDeleteRow; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + if (percentDeleteRow > 0) { + // add equality deletes + table().refresh(); + writeEqDeletes(NUM_ROWS, percentDeleteRow); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java similarity index 95% rename from spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java rename to spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java index ab3bf892b0ab..4852f3bd547f 100644 --- a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java @@ -34,7 +34,7 @@ * -PjmhOutputPath=benchmark/iceberg-source-parquet-delete-benchmark-result.txt * */ -public class IcebergSourceParquetDeleteBenchmark extends IcebergSourceDeleteBenchmark { +public class IcebergSourceParquetPosDeleteBenchmark extends IcebergSourceDeleteBenchmark { @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) private double percentDeleteRow; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 72b1345fa867..a7ef08dd3ba8 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.iceberg.arrow.vectorized.BaseBatchReader; @@ -60,87 +61,157 @@ public void setDeleteFilter(DeleteFilter deleteFilter) { @Override public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { - Preconditions.checkArgument( - numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); - ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length]; - if (reuse == null) { closeVectors(); } - Pair rowIdMapping = rowIdMapping(numRowsToRead); + ColumnBatchLoader batchLoader = new ColumnBatchLoader(numRowsToRead); + rowStartPosInBatch += numRowsToRead; + return batchLoader.columnarBatch; + } - for (int i = 0; i < readers.length; i += 1) { - vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); - int numRowsInVector = vectorHolders[i].numValues(); - Preconditions.checkState( - numRowsInVector == numRowsToRead, - "Number of rows in the vector %s didn't match expected %s ", - numRowsInVector, - numRowsToRead); + private class ColumnBatchLoader { + private int[] + rowIdMapping; // the rowId mapping to skip deleted rows for all column vectors inside a + // batch + private int numRows; + private ColumnarBatch columnarBatch; + + ColumnBatchLoader(int numRowsToRead) { + initRowIdMapping(numRowsToRead); + loadDataToColumnBatch(numRowsToRead); + } + + ColumnarBatch loadDataToColumnBatch(int numRowsToRead) { + Preconditions.checkArgument( + numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); + ColumnVector[] arrowColumnVectors = readDataToColumnVectors(numRowsToRead); + + columnarBatch = new ColumnarBatch(arrowColumnVectors); + columnarBatch.setNumRows(numRows); + + if (hasEqDeletes()) { + applyEqDelete(); + } + return columnarBatch; + } + + ColumnVector[] readDataToColumnVectors(int numRowsToRead) { + ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length]; + + for (int i = 0; i < readers.length; i += 1) { + vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); + int numRowsInVector = vectorHolders[i].numValues(); + Preconditions.checkState( + numRowsInVector == numRowsToRead, + "Number of rows in the vector %s didn't match expected %s ", + numRowsInVector, + numRowsToRead); - if (rowIdMapping == null) { - arrowColumnVectors[i] = - IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); - } else { - int[] rowIdMap = rowIdMapping.first(); - Integer numRows = rowIdMapping.second(); arrowColumnVectors[i] = - ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows); + hasDeletes() + ? ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMapping, numRows) + : IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); } + return arrowColumnVectors; } - rowStartPosInBatch += numRowsToRead; - ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors); + boolean hasDeletes() { + return rowIdMapping != null; + } - if (rowIdMapping == null) { - batch.setNumRows(numRowsToRead); - } else { - Integer numRows = rowIdMapping.second(); - batch.setNumRows(numRows); + boolean hasEqDeletes() { + return deletes != null && deletes.hasEqDeletes(); } - return batch; - } - private Pair rowIdMapping(int numRows) { - if (deletes != null && deletes.hasPosDeletes()) { - return buildRowIdMapping(deletes.deletedRowPositions(), numRows); - } else { - return null; + void initRowIdMapping(int numRowsToRead) { + Pair posDeleteRowIdMapping = posDelRowIdMapping(numRowsToRead); + if (posDeleteRowIdMapping != null) { + rowIdMapping = posDeleteRowIdMapping.first(); + numRows = posDeleteRowIdMapping.second(); + } else { + numRows = numRowsToRead; + rowIdMapping = initEqDeleteRowIdMapping(numRowsToRead); + } + } + + Pair posDelRowIdMapping(int numRowsToRead) { + if (deletes != null && deletes.hasPosDeletes()) { + return buildPosDelRowIdMapping(deletes.deletedRowPositions(), numRowsToRead); + } else { + return null; + } } - } - /** - * Build a row id mapping inside a batch, which skips delete rows. For example, if the 1st and 3rd - * rows are deleted in a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new - * num of rows is 3. - * - * @param deletedRowPositions a set of deleted row positions - * @param numRows the num of rows - * @return the mapping array and the new num of rows in a batch, null if no row is deleted - */ - private Pair buildRowIdMapping( - PositionDeleteIndex deletedRowPositions, int numRows) { - if (deletedRowPositions == null) { - return null; + /** + * Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we + * delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the + * row id mapping array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position + * deletes [Set Num records to 6] + * + * @param deletedRowPositions a set of deleted row positions + * @param numRowsToRead the num of rows + * @return the mapping array and the new num of rows in a batch, null if no row is deleted + */ + Pair buildPosDelRowIdMapping( + PositionDeleteIndex deletedRowPositions, int numRowsToRead) { + if (deletedRowPositions == null) { + return null; + } + + int[] posDelRowIdMapping = new int[numRowsToRead]; + int originalRowId = 0; + int currentRowId = 0; + while (originalRowId < numRowsToRead) { + if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { + posDelRowIdMapping[currentRowId] = originalRowId; + currentRowId++; + } + originalRowId++; + } + + if (currentRowId == numRowsToRead) { + // there is no delete in this batch + return null; + } else { + return Pair.of(posDelRowIdMapping, currentRowId); + } } - int[] rowIdMapping = new int[numRows]; - int originalRowId = 0; - int currentRowId = 0; - while (originalRowId < numRows) { - if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { - rowIdMapping[currentRowId] = originalRowId; - currentRowId++; + int[] initEqDeleteRowIdMapping(int numRowsToRead) { + int[] eqDeleteRowIdMapping = null; + if (hasEqDeletes()) { + eqDeleteRowIdMapping = new int[numRowsToRead]; + for (int i = 0; i < numRowsToRead; i++) { + eqDeleteRowIdMapping[i] = i; + } } - originalRowId++; + return eqDeleteRowIdMapping; } - if (currentRowId == numRows) { - // there is no delete in this batch - return null; - } else { - return Pair.of(rowIdMapping, currentRowId); + /** + * Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original + * status of the row id mapping array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying + * position deletes [Set Num records to 6] Equality delete 1 <= x <= 3 [0,4,5,7,-,-,-,-] -- + * After applying equality deletes [Set Num records to 4] + */ + void applyEqDelete() { + Iterator it = columnarBatch.rowIterator(); + int rowId = 0; + int currentRowId = 0; + while (it.hasNext()) { + InternalRow row = it.next(); + if (deletes.eqDeletedRowFilter().test(row)) { + // the row is NOT deleted + // skip deleted rows by pointing to the next undeleted row Id + rowIdMapping[currentRowId] = rowIdMapping[rowId]; + currentRowId++; + } + + rowId++; + } + + columnarBatch.setNumRows(currentRowId); } } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 35d0a9cbac23..ad17ba52628d 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -76,15 +76,19 @@ CloseableIterator open(FileScanTask task) { Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); if (task.file().format() == FileFormat.PARQUET) { SparkDeleteFilter deleteFilter = deleteFilter(task); + // get required schema for filtering out equality-delete rows in case equality-delete uses + // columns are + // not selected. + Schema requiredSchema = requiredSchema(deleteFilter); Parquet.ReadBuilder builder = Parquet.read(location) - .project(expectedSchema) + .project(requiredSchema) .split(task.start(), task.length()) .createBatchedReaderFunc( fileSchema -> VectorizedSparkParquetReaders.buildReader( - expectedSchema, + requiredSchema, fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant, @@ -141,6 +145,14 @@ private SparkDeleteFilter deleteFilter(FileScanTask task) { : new SparkDeleteFilter(task, table().schema(), expectedSchema); } + private Schema requiredSchema(DeleteFilter deleteFilter) { + if (deleteFilter != null && deleteFilter.hasEqDeletes()) { + return deleteFilter.requiredSchema(); + } else { + return expectedSchema; + } + } + private class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index c09ce2f76c48..4ba80f01d396 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -194,14 +194,11 @@ public PartitionReaderFactory createReaderFactory() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); - boolean hasNoEqDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasEqDeletes); - boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks); boolean batchReadOrc = hasNoDeleteFiles && allOrcFileScanTasks; - boolean batchReadParquet = - hasNoEqDeleteFiles && allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives; + boolean batchReadParquet = allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives; boolean readUsingBatch = batchReadsEnabled && (batchReadOrc || batchReadParquet);