diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java index 89cf487e6142..94c4372ffde7 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java @@ -104,6 +104,10 @@ public static VectorHolder constantHolder(int numRows, T constantValue) { return new ConstantVectorHolder(numRows, constantValue); } + public static VectorHolder deletedVectorHolder(int numRows) { + return new DeletedVectorHolder(numRows); + } + public static VectorHolder dummyHolder(int numRows) { return new ConstantVectorHolder(numRows); } @@ -146,4 +150,17 @@ public PositionVectorHolder(FieldVector vector, Type type, NullabilityHolder nul } } + public static class DeletedVectorHolder extends VectorHolder { + private final int numRows; + + public DeletedVectorHolder(int numRows) { + this.numRows = numRows; + } + + @Override + public int numValues() { + return numRows; + } + } + } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index cf75da197e92..7db664505f7e 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -517,5 +517,32 @@ public void setBatchSize(int batchSize) { } } + /** + * A Dummy Vector Reader which doesn't actually read files. Instead, it returns a + * Deleted Vector Holder which indicates whether a given row is deleted. + */ + public static class DeletedVectorReader extends VectorizedArrowReader { + public DeletedVectorReader() { + } + + @Override + public VectorHolder read(VectorHolder reuse, int numValsToRead) { + return VectorHolder.deletedVectorHolder(numValsToRead); + } + + @Override + public void setRowGroupInfo(PageReadStore source, Map metadata, long rowPosition) { + } + + @Override + public String toString() { + return "DeletedVectorReader"; + } + + @Override + public void setBatchSize(int batchSize) { + } + } + } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java index 5a72160bdf41..88480fd21b0e 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java @@ -91,7 +91,7 @@ public VectorizedReader message( reorderedFields.add(VectorizedArrowReader.positions()); } } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(new VectorizedArrowReader.ConstantVectorReader<>(false)); + reorderedFields.add(new VectorizedArrowReader.DeletedVectorReader()); } else if (reader != null) { reorderedFields.add(reader); } else { diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 0d291386b978..a8a6313dbfaa 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -70,8 +70,7 @@ public class TestSparkParquetReadMetadataColumns { private static final Schema PROJECTION_SCHEMA = new Schema( required(100, "id", Types.LongType.get()), required(101, "data", Types.StringType.get()), - MetadataColumns.ROW_POSITION, - MetadataColumns.IS_DELETED + MetadataColumns.ROW_POSITION ); private static final int NUM_ROWS = 1000; @@ -104,7 +103,6 @@ public class TestSparkParquetReadMetadataColumns { } row.update(1, UTF8String.fromString("str" + i)); row.update(2, i); - row.update(3, false); EXPECTED_ROWS.add(row); } } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 0d291386b978..a8a6313dbfaa 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -70,8 +70,7 @@ public class TestSparkParquetReadMetadataColumns { private static final Schema PROJECTION_SCHEMA = new Schema( required(100, "id", Types.LongType.get()), required(101, "data", Types.StringType.get()), - MetadataColumns.ROW_POSITION, - MetadataColumns.IS_DELETED + MetadataColumns.ROW_POSITION ); private static final int NUM_ROWS = 1000; @@ -104,7 +103,6 @@ public class TestSparkParquetReadMetadataColumns { } row.update(1, UTF8String.fromString("str" + i)); row.update(2, i); - row.update(3, false); EXPECTED_ROWS.add(row); } } diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 0d291386b978..a8a6313dbfaa 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -70,8 +70,7 @@ public class TestSparkParquetReadMetadataColumns { private static final Schema PROJECTION_SCHEMA = new Schema( required(100, "id", Types.LongType.get()), required(101, "data", Types.StringType.get()), - MetadataColumns.ROW_POSITION, - MetadataColumns.IS_DELETED + MetadataColumns.ROW_POSITION ); private static final int NUM_ROWS = 1000; @@ -104,7 +103,6 @@ public class TestSparkParquetReadMetadataColumns { } row.update(1, UTF8String.fromString("str" + i)); row.update(2, i); - row.update(3, false); EXPECTED_ROWS.add(row); } } diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 00addb8a327b..60dde6e98a16 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -44,6 +44,7 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; @Fork(1) @State(Scope.Benchmark) @@ -118,6 +119,10 @@ protected void materialize(Dataset ds) { ds.queryExecution().toRdd().toJavaRDD().foreach(record -> { }); } + protected void materialize(Dataset ds, Blackhole blackhole) { + blackhole.consume(ds.queryExecution().toRdd().toJavaRDD().count()); + } + protected void appendAsFile(Dataset ds) { // ensure the schema is precise (including nullability) StructType sparkSchema = SparkSchemaUtil.convert(table.schema()); diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java index 0cdf209889b7..5db431eaa50c 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -49,9 +49,11 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.infra.Blackhole; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED; import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -80,50 +82,79 @@ public void tearDownBenchmark() throws IOException { @Benchmark @Threads(1) - public void readIceberg() { + public void readIceberg(Blackhole blackhole) { Map tableProperties = Maps.newHashMap(); tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false"); withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation); - materialize(df); + materialize(df, blackhole); }); } @Benchmark @Threads(1) - public void readIcebergVectorized() { + public void readIcebergWithIsDeletedColumn(Blackhole blackhole) { Map tableProperties = Maps.newHashMap(); tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); - tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true"); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false"); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = false"); + materialize(df, blackhole); + }); + } + + @Benchmark + @Threads(1) + public void readDeletedRows(Blackhole blackhole) { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false"); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = true"); + materialize(df, blackhole); + }); + } + + @Benchmark + @Threads(1) + public void readIcebergVectorized(Blackhole blackhole) { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true"); withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation); - materialize(df); + materialize(df, blackhole); }); } @Benchmark @Threads(1) - public void readIcebergWithIsDeletedColumn() { + public void readIcebergWithIsDeletedColumnVectorized(Blackhole blackhole) { Map tableProperties = Maps.newHashMap(); tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true"); withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = false"); - materialize(df); + materialize(df, blackhole); }); } @Benchmark @Threads(1) - public void readDeletedRows() { + public void readDeletedRowsVectorized(Blackhole blackhole) { Map tableProperties = Maps.newHashMap(); tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true"); withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = true"); - materialize(df); + materialize(df, blackhole); }); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java new file mode 100644 index 000000000000..c0459aae382b --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.arrow.vectorized.VectorHolder; +import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.vectorized.ColumnVector; + +class ColumnVectorBuilder { + private boolean[] isDeleted; + private int[] rowIdMapping; + + public ColumnVectorBuilder withDeletedRows(int[] rowIdMappingArray, boolean[] isDeletedArray) { + this.rowIdMapping = rowIdMappingArray; + this.isDeleted = isDeletedArray; + return this; + } + + public ColumnVector build(VectorHolder holder, int numRows) { + if (holder.isDummy()) { + if (holder instanceof VectorHolder.DeletedVectorHolder) { + return new DeletedColumnVector(Types.BooleanType.get(), isDeleted); + } else if (holder instanceof ConstantVectorHolder) { + return new ConstantColumnVector(Types.IntegerType.get(), numRows, + ((ConstantVectorHolder) holder).getConstant()); + } else { + throw new IllegalStateException("Unknown dummy vector holder: " + holder); + } + } else if (rowIdMapping != null) { + return new ColumnVectorWithFilter(holder, rowIdMapping); + } else { + return new IcebergArrowColumnVector(holder); + } + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java index 7804c02f9042..db4e41b04176 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java @@ -20,10 +20,7 @@ package org.apache.iceberg.spark.data.vectorized; import org.apache.iceberg.arrow.vectorized.VectorHolder; -import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.unsafe.types.UTF8String; @@ -96,10 +93,4 @@ public byte[] getBinary(int rowId) { } return accessor().getBinary(rowIdMapping[rowId]); } - - public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) { - return holder.isDummy() ? - new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) : - new ColumnVectorWithFilter(holder, rowIdMapping); - } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index e18058130eb2..6dada0f84332 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.iceberg.arrow.vectorized.BaseBatchReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; +import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; @@ -42,11 +43,13 @@ * {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { + private final boolean hasIsDeletedColumn; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; public ColumnarBatchReader(List> readers) { super(readers); + this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader); } @Override @@ -66,37 +69,53 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { closeVectors(); } - ColumnBatchLoader batchLoader = new ColumnBatchLoader(numRowsToRead); + ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch(); rowStartPosInBatch += numRowsToRead; - return batchLoader.columnarBatch; + return columnarBatch; } private class ColumnBatchLoader { - private int[] rowIdMapping; // the rowId mapping to skip deleted rows for all column vectors inside a batch - private int numRows; - private ColumnarBatch columnarBatch; + private final int numRowsToRead; + // the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when there is no deletes + private int[] rowIdMapping; + // the array to indicate if a row is deleted or not, it is null when there is no "_deleted" metadata column + private boolean[] isDeleted; ColumnBatchLoader(int numRowsToRead) { - initRowIdMapping(numRowsToRead); - loadDataToColumnBatch(numRowsToRead); + Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); + this.numRowsToRead = numRowsToRead; + if (hasIsDeletedColumn) { + isDeleted = new boolean[numRowsToRead]; + } } - ColumnarBatch loadDataToColumnBatch(int numRowsToRead) { - Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); - ColumnVector[] arrowColumnVectors = readDataToColumnVectors(numRowsToRead); + ColumnarBatch loadDataToColumnBatch() { + int numRowsUndeleted = initRowIdMapping(); - columnarBatch = new ColumnarBatch(arrowColumnVectors); - columnarBatch.setNumRows(numRows); + ColumnVector[] arrowColumnVectors = readDataToColumnVectors(); + + ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors); + newColumnarBatch.setNumRows(numRowsUndeleted); if (hasEqDeletes()) { - applyEqDelete(); + applyEqDelete(newColumnarBatch); + } + + if (hasIsDeletedColumn && rowIdMapping != null) { + // reset the row id mapping array, so that it doesn't filter out the deleted rows + for (int i = 0; i < numRowsToRead; i++) { + rowIdMapping[i] = i; + } + newColumnarBatch.setNumRows(numRowsToRead); } - return columnarBatch; + + return newColumnarBatch; } - ColumnVector[] readDataToColumnVectors(int numRowsToRead) { + ColumnVector[] readDataToColumnVectors() { ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length]; + ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder(); for (int i = 0; i < readers.length; i += 1) { vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); int numRowsInVector = vectorHolders[i].numValues(); @@ -105,35 +124,30 @@ ColumnVector[] readDataToColumnVectors(int numRowsToRead) { "Number of rows in the vector %s didn't match expected %s ", numRowsInVector, numRowsToRead); - arrowColumnVectors[i] = hasDeletes() ? - ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMapping, numRows) : - IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + arrowColumnVectors[i] = columnVectorBuilder.withDeletedRows(rowIdMapping, isDeleted) + .build(vectorHolders[i], numRowsInVector); } return arrowColumnVectors; } - boolean hasDeletes() { - return rowIdMapping != null; - } - boolean hasEqDeletes() { return deletes != null && deletes.hasEqDeletes(); } - void initRowIdMapping(int numRowsToRead) { - Pair posDeleteRowIdMapping = posDelRowIdMapping(numRowsToRead); + int initRowIdMapping() { + Pair posDeleteRowIdMapping = posDelRowIdMapping(); if (posDeleteRowIdMapping != null) { rowIdMapping = posDeleteRowIdMapping.first(); - numRows = posDeleteRowIdMapping.second(); + return posDeleteRowIdMapping.second(); } else { - numRows = numRowsToRead; - rowIdMapping = initEqDeleteRowIdMapping(numRowsToRead); + rowIdMapping = initEqDeleteRowIdMapping(); + return numRowsToRead; } } - Pair posDelRowIdMapping(int numRowsToRead) { + Pair posDelRowIdMapping() { if (deletes != null && deletes.hasPosDeletes()) { - return buildPosDelRowIdMapping(deletes.deletedRowPositions(), numRowsToRead); + return buildPosDelRowIdMapping(deletes.deletedRowPositions()); } else { return null; } @@ -143,14 +157,15 @@ Pair posDelRowIdMapping(int numRowsToRead) { * Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we delete 2 rows in a * batch with 8 rows in total. * [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array + * [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array * Position delete 2, 6 * [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] + * [F,F,T,F,F,F,T,F] -- After applying position deletes * * @param deletedRowPositions a set of deleted row positions - * @param numRowsToRead the num of rows * @return the mapping array and the new num of rows in a batch, null if no row is deleted */ - Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions, int numRowsToRead) { + Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { if (deletedRowPositions == null) { return null; } @@ -162,6 +177,8 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { posDelRowIdMapping[currentRowId] = originalRowId; currentRowId++; + } else if (hasIsDeletedColumn) { + isDeleted[originalRowId] = true; } originalRowId++; } @@ -174,7 +191,7 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit } } - int[] initEqDeleteRowIdMapping(int numRowsToRead) { + int[] initEqDeleteRowIdMapping() { int[] eqDeleteRowIdMapping = null; if (hasEqDeletes()) { eqDeleteRowIdMapping = new int[numRowsToRead]; @@ -188,12 +205,17 @@ int[] initEqDeleteRowIdMapping(int numRowsToRead) { /** * Filter out the equality deleted rows. Here is an example, * [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array + * [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array * Position delete 2, 6 * [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] + * [F,F,T,F,F,F,T,F] -- After applying position deletes * Equality delete 1 <= x <= 3 * [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4] + * [F,T,T,T,F,F,T,F] -- After applying equality deletes + * + * @param columnarBatch the {@link ColumnarBatch} to apply the equality delete */ - void applyEqDelete() { + void applyEqDelete(ColumnarBatch columnarBatch) { Iterator it = columnarBatch.rowIterator(); int rowId = 0; int currentRowId = 0; @@ -204,6 +226,8 @@ void applyEqDelete() { // skip deleted rows by pointing to the next undeleted row Id rowIdMapping[currentRowId] = rowIdMapping[rowId]; currentRowId++; + } else if (hasIsDeletedColumn) { + isDeleted[rowIdMapping[rowId]] = true; } rowId++; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java new file mode 100644 index 000000000000..8fc3d4527321 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +public class DeletedColumnVector extends ColumnVector { + private final boolean[] isDeleted; + + public DeletedColumnVector(Type type, boolean[] isDeleted) { + super(SparkSchemaUtil.convert(type)); + Preconditions.checkArgument(isDeleted != null, "Boolean array isDeleted cannot be null"); + this.isDeleted = isDeleted; + } + + @Override + public void close() { + } + + @Override + public boolean hasNull() { + return false; + } + + @Override + public int numNulls() { + return 0; + } + + @Override + public boolean isNullAt(int rowId) { + return false; + } + + @Override + public boolean getBoolean(int rowId) { + return isDeleted[rowId]; + } + + @Override + public byte getByte(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarMap getMap(int ordinal) { + throw new UnsupportedOperationException(); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new UnsupportedOperationException(); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBinary(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnVector getChild(int ordinal) { + throw new UnsupportedOperationException(); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java index c5b7853d5353..1812282a34f6 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java @@ -22,9 +22,7 @@ import org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor; import org.apache.iceberg.arrow.vectorized.NullabilityHolder; import org.apache.iceberg.arrow.vectorized.VectorHolder; -import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -153,12 +151,6 @@ public ArrowColumnVector getChild(int ordinal) { return accessor.childColumn(ordinal); } - static ColumnVector forHolder(VectorHolder holder, int numRows) { - return holder.isDummy() ? - new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) : - new IcebergArrowColumnVector(holder); - } - public ArrowVectorAccessor vectorAccessor() { return accessor; } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 256753f4ccc6..f075e71742ea 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -56,6 +56,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.unsafe.types.UTF8String; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -173,27 +174,28 @@ public void testReadRowNumbers() throws IOException { @Test public void testReadRowNumbersWithDelete() throws IOException { - if (vectorized) { - List expectedRowsAfterDelete = Lists.newArrayList(EXPECTED_ROWS); - // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100, 200) - for (int i = 1; i <= 5; i++) { - expectedRowsAfterDelete.remove(98); - } + Assume.assumeTrue(vectorized); + + List expectedRowsAfterDelete = Lists.newArrayList(); + EXPECTED_ROWS.forEach(row -> expectedRowsAfterDelete.add(row.copy())); + // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100, 200) + for (int i = 98; i <= 102; i++) { + expectedRowsAfterDelete.get(i).update(3, true); + } - Parquet.ReadBuilder builder = Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA); + Parquet.ReadBuilder builder = Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA); - DeleteFilter deleteFilter = mock(DeleteFilter.class); - when(deleteFilter.hasPosDeletes()).thenReturn(true); - PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex(); - deletedRowPos.delete(98, 103); - when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex(); + deletedRowPos.delete(98, 103); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); - builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA, - fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, Maps.newHashMap(), deleteFilter)); - builder.recordsPerBatch(RECORDS_PER_BATCH); + builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA, + fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, Maps.newHashMap(), deleteFilter)); + builder.recordsPerBatch(RECORDS_PER_BATCH); - validate(expectedRowsAfterDelete, builder); - } + validate(expectedRowsAfterDelete, builder); } private class CustomizedPositionDeleteIndex implements PositionDeleteIndex { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index aa3e193ac3af..2da25d5ee529 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -65,7 +65,6 @@ import org.jetbrains.annotations.NotNull; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -277,8 +276,6 @@ public void testPosDeletesAllRowsInBatch() throws IOException { @Test public void testPosDeletesWithDeletedColumn() throws IOException { - Assume.assumeFalse(vectorized); - // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all deleted. List> deletes = Lists.newArrayList( Pair.of(dataFile.path(), 0L), // id = 29 @@ -303,8 +300,6 @@ public void testPosDeletesWithDeletedColumn() throws IOException { @Test public void testEqualityDeleteWithDeletedColumn() throws IOException { - Assume.assumeFalse(vectorized); - String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); Schema deleteRowSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteRowSchema); @@ -329,8 +324,6 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { @Test public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { - Assume.assumeFalse(vectorized); - Schema dataSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(dataSchema); List dataDeletes = Lists.newArrayList( @@ -364,8 +357,6 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { @Test public void testFilterOnDeletedMetadataColumn() throws IOException { - Assume.assumeFalse(vectorized); - List> deletes = Lists.newArrayList( Pair.of(dataFile.path(), 0L), // id = 29 Pair.of(dataFile.path(), 1L), // id = 43 @@ -419,8 +410,6 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { @Test public void testIsDeletedColumnWithoutDeleteFile() { - Assume.assumeFalse(vectorized); - StructLikeSet expected = expectedRowSet(); StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); diff --git a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 00addb8a327b..60dde6e98a16 100644 --- a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -44,6 +44,7 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; @Fork(1) @State(Scope.Benchmark) @@ -118,6 +119,10 @@ protected void materialize(Dataset ds) { ds.queryExecution().toRdd().toJavaRDD().foreach(record -> { }); } + protected void materialize(Dataset ds, Blackhole blackhole) { + blackhole.consume(ds.queryExecution().toRdd().toJavaRDD().count()); + } + protected void appendAsFile(Dataset ds) { // ensure the schema is precise (including nullability) StructType sparkSchema = SparkSchemaUtil.convert(table.schema()); diff --git a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java index 0cdf209889b7..5db431eaa50c 100644 --- a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java +++ b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -49,9 +49,11 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.infra.Blackhole; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED; import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -80,50 +82,79 @@ public void tearDownBenchmark() throws IOException { @Benchmark @Threads(1) - public void readIceberg() { + public void readIceberg(Blackhole blackhole) { Map tableProperties = Maps.newHashMap(); tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false"); withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation); - materialize(df); + materialize(df, blackhole); }); } @Benchmark @Threads(1) - public void readIcebergVectorized() { + public void readIcebergWithIsDeletedColumn(Blackhole blackhole) { Map tableProperties = Maps.newHashMap(); tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); - tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true"); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false"); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = false"); + materialize(df, blackhole); + }); + } + + @Benchmark + @Threads(1) + public void readDeletedRows(Blackhole blackhole) { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false"); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = true"); + materialize(df, blackhole); + }); + } + + @Benchmark + @Threads(1) + public void readIcebergVectorized(Blackhole blackhole) { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true"); withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation); - materialize(df); + materialize(df, blackhole); }); } @Benchmark @Threads(1) - public void readIcebergWithIsDeletedColumn() { + public void readIcebergWithIsDeletedColumnVectorized(Blackhole blackhole) { Map tableProperties = Maps.newHashMap(); tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true"); withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = false"); - materialize(df); + materialize(df, blackhole); }); } @Benchmark @Threads(1) - public void readDeletedRows() { + public void readDeletedRowsVectorized(Blackhole blackhole) { Map tableProperties = Maps.newHashMap(); tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true"); withTableProperties(tableProperties, () -> { String tableLocation = table().location(); Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = true"); - materialize(df); + materialize(df, blackhole); }); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java new file mode 100644 index 000000000000..c0459aae382b --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.arrow.vectorized.VectorHolder; +import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.vectorized.ColumnVector; + +class ColumnVectorBuilder { + private boolean[] isDeleted; + private int[] rowIdMapping; + + public ColumnVectorBuilder withDeletedRows(int[] rowIdMappingArray, boolean[] isDeletedArray) { + this.rowIdMapping = rowIdMappingArray; + this.isDeleted = isDeletedArray; + return this; + } + + public ColumnVector build(VectorHolder holder, int numRows) { + if (holder.isDummy()) { + if (holder instanceof VectorHolder.DeletedVectorHolder) { + return new DeletedColumnVector(Types.BooleanType.get(), isDeleted); + } else if (holder instanceof ConstantVectorHolder) { + return new ConstantColumnVector(Types.IntegerType.get(), numRows, + ((ConstantVectorHolder) holder).getConstant()); + } else { + throw new IllegalStateException("Unknown dummy vector holder: " + holder); + } + } else if (rowIdMapping != null) { + return new ColumnVectorWithFilter(holder, rowIdMapping); + } else { + return new IcebergArrowColumnVector(holder); + } + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java index 7804c02f9042..db4e41b04176 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java @@ -20,10 +20,7 @@ package org.apache.iceberg.spark.data.vectorized; import org.apache.iceberg.arrow.vectorized.VectorHolder; -import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.unsafe.types.UTF8String; @@ -96,10 +93,4 @@ public byte[] getBinary(int rowId) { } return accessor().getBinary(rowIdMapping[rowId]); } - - public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) { - return holder.isDummy() ? - new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) : - new ColumnVectorWithFilter(holder, rowIdMapping); - } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index e18058130eb2..6dada0f84332 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.iceberg.arrow.vectorized.BaseBatchReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; +import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; @@ -42,11 +43,13 @@ * {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { + private final boolean hasIsDeletedColumn; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; public ColumnarBatchReader(List> readers) { super(readers); + this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader); } @Override @@ -66,37 +69,53 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { closeVectors(); } - ColumnBatchLoader batchLoader = new ColumnBatchLoader(numRowsToRead); + ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch(); rowStartPosInBatch += numRowsToRead; - return batchLoader.columnarBatch; + return columnarBatch; } private class ColumnBatchLoader { - private int[] rowIdMapping; // the rowId mapping to skip deleted rows for all column vectors inside a batch - private int numRows; - private ColumnarBatch columnarBatch; + private final int numRowsToRead; + // the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when there is no deletes + private int[] rowIdMapping; + // the array to indicate if a row is deleted or not, it is null when there is no "_deleted" metadata column + private boolean[] isDeleted; ColumnBatchLoader(int numRowsToRead) { - initRowIdMapping(numRowsToRead); - loadDataToColumnBatch(numRowsToRead); + Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); + this.numRowsToRead = numRowsToRead; + if (hasIsDeletedColumn) { + isDeleted = new boolean[numRowsToRead]; + } } - ColumnarBatch loadDataToColumnBatch(int numRowsToRead) { - Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); - ColumnVector[] arrowColumnVectors = readDataToColumnVectors(numRowsToRead); + ColumnarBatch loadDataToColumnBatch() { + int numRowsUndeleted = initRowIdMapping(); - columnarBatch = new ColumnarBatch(arrowColumnVectors); - columnarBatch.setNumRows(numRows); + ColumnVector[] arrowColumnVectors = readDataToColumnVectors(); + + ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors); + newColumnarBatch.setNumRows(numRowsUndeleted); if (hasEqDeletes()) { - applyEqDelete(); + applyEqDelete(newColumnarBatch); + } + + if (hasIsDeletedColumn && rowIdMapping != null) { + // reset the row id mapping array, so that it doesn't filter out the deleted rows + for (int i = 0; i < numRowsToRead; i++) { + rowIdMapping[i] = i; + } + newColumnarBatch.setNumRows(numRowsToRead); } - return columnarBatch; + + return newColumnarBatch; } - ColumnVector[] readDataToColumnVectors(int numRowsToRead) { + ColumnVector[] readDataToColumnVectors() { ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length]; + ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder(); for (int i = 0; i < readers.length; i += 1) { vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); int numRowsInVector = vectorHolders[i].numValues(); @@ -105,35 +124,30 @@ ColumnVector[] readDataToColumnVectors(int numRowsToRead) { "Number of rows in the vector %s didn't match expected %s ", numRowsInVector, numRowsToRead); - arrowColumnVectors[i] = hasDeletes() ? - ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMapping, numRows) : - IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + arrowColumnVectors[i] = columnVectorBuilder.withDeletedRows(rowIdMapping, isDeleted) + .build(vectorHolders[i], numRowsInVector); } return arrowColumnVectors; } - boolean hasDeletes() { - return rowIdMapping != null; - } - boolean hasEqDeletes() { return deletes != null && deletes.hasEqDeletes(); } - void initRowIdMapping(int numRowsToRead) { - Pair posDeleteRowIdMapping = posDelRowIdMapping(numRowsToRead); + int initRowIdMapping() { + Pair posDeleteRowIdMapping = posDelRowIdMapping(); if (posDeleteRowIdMapping != null) { rowIdMapping = posDeleteRowIdMapping.first(); - numRows = posDeleteRowIdMapping.second(); + return posDeleteRowIdMapping.second(); } else { - numRows = numRowsToRead; - rowIdMapping = initEqDeleteRowIdMapping(numRowsToRead); + rowIdMapping = initEqDeleteRowIdMapping(); + return numRowsToRead; } } - Pair posDelRowIdMapping(int numRowsToRead) { + Pair posDelRowIdMapping() { if (deletes != null && deletes.hasPosDeletes()) { - return buildPosDelRowIdMapping(deletes.deletedRowPositions(), numRowsToRead); + return buildPosDelRowIdMapping(deletes.deletedRowPositions()); } else { return null; } @@ -143,14 +157,15 @@ Pair posDelRowIdMapping(int numRowsToRead) { * Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we delete 2 rows in a * batch with 8 rows in total. * [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array + * [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array * Position delete 2, 6 * [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] + * [F,F,T,F,F,F,T,F] -- After applying position deletes * * @param deletedRowPositions a set of deleted row positions - * @param numRowsToRead the num of rows * @return the mapping array and the new num of rows in a batch, null if no row is deleted */ - Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions, int numRowsToRead) { + Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { if (deletedRowPositions == null) { return null; } @@ -162,6 +177,8 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { posDelRowIdMapping[currentRowId] = originalRowId; currentRowId++; + } else if (hasIsDeletedColumn) { + isDeleted[originalRowId] = true; } originalRowId++; } @@ -174,7 +191,7 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit } } - int[] initEqDeleteRowIdMapping(int numRowsToRead) { + int[] initEqDeleteRowIdMapping() { int[] eqDeleteRowIdMapping = null; if (hasEqDeletes()) { eqDeleteRowIdMapping = new int[numRowsToRead]; @@ -188,12 +205,17 @@ int[] initEqDeleteRowIdMapping(int numRowsToRead) { /** * Filter out the equality deleted rows. Here is an example, * [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array + * [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array * Position delete 2, 6 * [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] + * [F,F,T,F,F,F,T,F] -- After applying position deletes * Equality delete 1 <= x <= 3 * [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4] + * [F,T,T,T,F,F,T,F] -- After applying equality deletes + * + * @param columnarBatch the {@link ColumnarBatch} to apply the equality delete */ - void applyEqDelete() { + void applyEqDelete(ColumnarBatch columnarBatch) { Iterator it = columnarBatch.rowIterator(); int rowId = 0; int currentRowId = 0; @@ -204,6 +226,8 @@ void applyEqDelete() { // skip deleted rows by pointing to the next undeleted row Id rowIdMapping[currentRowId] = rowIdMapping[rowId]; currentRowId++; + } else if (hasIsDeletedColumn) { + isDeleted[rowIdMapping[rowId]] = true; } rowId++; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java new file mode 100644 index 000000000000..8fc3d4527321 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +public class DeletedColumnVector extends ColumnVector { + private final boolean[] isDeleted; + + public DeletedColumnVector(Type type, boolean[] isDeleted) { + super(SparkSchemaUtil.convert(type)); + Preconditions.checkArgument(isDeleted != null, "Boolean array isDeleted cannot be null"); + this.isDeleted = isDeleted; + } + + @Override + public void close() { + } + + @Override + public boolean hasNull() { + return false; + } + + @Override + public int numNulls() { + return 0; + } + + @Override + public boolean isNullAt(int rowId) { + return false; + } + + @Override + public boolean getBoolean(int rowId) { + return isDeleted[rowId]; + } + + @Override + public byte getByte(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarMap getMap(int ordinal) { + throw new UnsupportedOperationException(); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new UnsupportedOperationException(); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBinary(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnVector getChild(int ordinal) { + throw new UnsupportedOperationException(); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java index c5b7853d5353..1812282a34f6 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java @@ -22,9 +22,7 @@ import org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor; import org.apache.iceberg.arrow.vectorized.NullabilityHolder; import org.apache.iceberg.arrow.vectorized.VectorHolder; -import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -153,12 +151,6 @@ public ArrowColumnVector getChild(int ordinal) { return accessor.childColumn(ordinal); } - static ColumnVector forHolder(VectorHolder holder, int numRows) { - return holder.isDummy() ? - new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) : - new IcebergArrowColumnVector(holder); - } - public ArrowVectorAccessor vectorAccessor() { return accessor; } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 256753f4ccc6..f075e71742ea 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -56,6 +56,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.unsafe.types.UTF8String; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -173,27 +174,28 @@ public void testReadRowNumbers() throws IOException { @Test public void testReadRowNumbersWithDelete() throws IOException { - if (vectorized) { - List expectedRowsAfterDelete = Lists.newArrayList(EXPECTED_ROWS); - // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100, 200) - for (int i = 1; i <= 5; i++) { - expectedRowsAfterDelete.remove(98); - } + Assume.assumeTrue(vectorized); + + List expectedRowsAfterDelete = Lists.newArrayList(); + EXPECTED_ROWS.forEach(row -> expectedRowsAfterDelete.add(row.copy())); + // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100, 200) + for (int i = 98; i <= 102; i++) { + expectedRowsAfterDelete.get(i).update(3, true); + } - Parquet.ReadBuilder builder = Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA); + Parquet.ReadBuilder builder = Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA); - DeleteFilter deleteFilter = mock(DeleteFilter.class); - when(deleteFilter.hasPosDeletes()).thenReturn(true); - PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex(); - deletedRowPos.delete(98, 103); - when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex(); + deletedRowPos.delete(98, 103); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); - builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA, - fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, Maps.newHashMap(), deleteFilter)); - builder.recordsPerBatch(RECORDS_PER_BATCH); + builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA, + fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, Maps.newHashMap(), deleteFilter)); + builder.recordsPerBatch(RECORDS_PER_BATCH); - validate(expectedRowsAfterDelete, builder); - } + validate(expectedRowsAfterDelete, builder); } private class CustomizedPositionDeleteIndex implements PositionDeleteIndex { diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index aa3e193ac3af..2da25d5ee529 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -65,7 +65,6 @@ import org.jetbrains.annotations.NotNull; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -277,8 +276,6 @@ public void testPosDeletesAllRowsInBatch() throws IOException { @Test public void testPosDeletesWithDeletedColumn() throws IOException { - Assume.assumeFalse(vectorized); - // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all deleted. List> deletes = Lists.newArrayList( Pair.of(dataFile.path(), 0L), // id = 29 @@ -303,8 +300,6 @@ public void testPosDeletesWithDeletedColumn() throws IOException { @Test public void testEqualityDeleteWithDeletedColumn() throws IOException { - Assume.assumeFalse(vectorized); - String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); Schema deleteRowSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteRowSchema); @@ -329,8 +324,6 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { @Test public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { - Assume.assumeFalse(vectorized); - Schema dataSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(dataSchema); List dataDeletes = Lists.newArrayList( @@ -364,8 +357,6 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { @Test public void testFilterOnDeletedMetadataColumn() throws IOException { - Assume.assumeFalse(vectorized); - List> deletes = Lists.newArrayList( Pair.of(dataFile.path(), 0L), // id = 29 Pair.of(dataFile.path(), 1L), // id = 43 @@ -419,8 +410,6 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { @Test public void testIsDeletedColumnWithoutDeleteFile() { - Assume.assumeFalse(vectorized); - StructLikeSet expected = expectedRowSet(); StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual);