Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public static <T> VectorHolder constantHolder(int numRows, T constantValue) {
return new ConstantVectorHolder(numRows, constantValue);
}

public static VectorHolder deletedVectorHolder(int numRows) {
return new DeletedVectorHolder(numRows);
}

public static VectorHolder dummyHolder(int numRows) {
return new ConstantVectorHolder(numRows);
}
Expand Down Expand Up @@ -146,4 +150,17 @@ public PositionVectorHolder(FieldVector vector, Type type, NullabilityHolder nul
}
}

public static class DeletedVectorHolder extends VectorHolder {
private final int numRows;

public DeletedVectorHolder(int numRows) {
this.numRows = numRows;
}

@Override
public int numValues() {
return numRows;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -517,5 +517,32 @@ public void setBatchSize(int batchSize) {
}
}

/**
* A Dummy Vector Reader which doesn't actually read files. Instead, it returns a
* Deleted Vector Holder which indicates whether a given row is deleted.
*/
public static class DeletedVectorReader extends VectorizedArrowReader {
public DeletedVectorReader() {
}

@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
return VectorHolder.deletedVectorHolder(numValsToRead);
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
}

@Override
public String toString() {
return "DeletedVectorReader";
}

@Override
public void setBatchSize(int batchSize) {
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public VectorizedReader<?> message(
reorderedFields.add(VectorizedArrowReader.positions());
}
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(new VectorizedArrowReader.ConstantVectorReader<>(false));
reorderedFields.add(new VectorizedArrowReader.DeletedVectorReader());
} else if (reader != null) {
reorderedFields.add(reader);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public class TestSparkParquetReadMetadataColumns {
private static final Schema PROJECTION_SCHEMA = new Schema(
required(100, "id", Types.LongType.get()),
required(101, "data", Types.StringType.get()),
MetadataColumns.ROW_POSITION,
MetadataColumns.IS_DELETED
MetadataColumns.ROW_POSITION
Copy link
Contributor Author

@flyrain flyrain May 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this change since the class VectorizedReaderBuilder is shared by all spark versions. The change in line 94 of VectorizedReaderBuilder changes the type of the reader as the following code shows. Then, the read throws exception in the method IcebergArrowColumnVector.forHolder() of the old Spark version. This change should be fine due to the old Spark doesn't really support _deleted metadata column.

        reorderedFields.add(new VectorizedArrowReader.DeletedVectorReader());

);

private static final int NUM_ROWS = 1000;
Expand Down Expand Up @@ -104,7 +103,6 @@ public class TestSparkParquetReadMetadataColumns {
}
row.update(1, UTF8String.fromString("str" + i));
row.update(2, i);
row.update(3, false);
EXPECTED_ROWS.add(row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public class TestSparkParquetReadMetadataColumns {
private static final Schema PROJECTION_SCHEMA = new Schema(
required(100, "id", Types.LongType.get()),
required(101, "data", Types.StringType.get()),
MetadataColumns.ROW_POSITION,
MetadataColumns.IS_DELETED
MetadataColumns.ROW_POSITION
);

private static final int NUM_ROWS = 1000;
Expand Down Expand Up @@ -104,7 +103,6 @@ public class TestSparkParquetReadMetadataColumns {
}
row.update(1, UTF8String.fromString("str" + i));
row.update(2, i);
row.update(3, false);
EXPECTED_ROWS.add(row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public class TestSparkParquetReadMetadataColumns {
private static final Schema PROJECTION_SCHEMA = new Schema(
required(100, "id", Types.LongType.get()),
required(101, "data", Types.StringType.get()),
MetadataColumns.ROW_POSITION,
MetadataColumns.IS_DELETED
MetadataColumns.ROW_POSITION
);

private static final int NUM_ROWS = 1000;
Expand Down Expand Up @@ -104,7 +103,6 @@ public class TestSparkParquetReadMetadataColumns {
}
row.update(1, UTF8String.fromString("str" + i));
row.update(2, i);
row.update(3, false);
EXPECTED_ROWS.add(row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@Fork(1)
@State(Scope.Benchmark)
Expand Down Expand Up @@ -118,6 +119,10 @@ protected void materialize(Dataset<?> ds) {
ds.queryExecution().toRdd().toJavaRDD().foreach(record -> { });
}

protected void materialize(Dataset<?> ds, Blackhole blackhole) {
blackhole.consume(ds.queryExecution().toRdd().toJavaRDD().count());
}

protected void appendAsFile(Dataset<Row> ds) {
// ensure the schema is precise (including nullability)
StructType sparkSchema = SparkSchemaUtil.convert(table.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.infra.Blackhole;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
Expand Down Expand Up @@ -80,50 +82,79 @@ public void tearDownBenchmark() throws IOException {

@Benchmark
@Threads(1)
public void readIceberg() {
public void readIceberg(Blackhole blackhole) {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false");
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
materialize(df);
materialize(df, blackhole);
});
}

@Benchmark
@Threads(1)
public void readIcebergVectorized() {
public void readIcebergWithIsDeletedColumn(Blackhole blackhole) {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true");
tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false");
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = false");
materialize(df, blackhole);
});
}

@Benchmark
@Threads(1)
public void readDeletedRows(Blackhole blackhole) {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "false");
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = true");
materialize(df, blackhole);
});
}

@Benchmark
@Threads(1)
public void readIcebergVectorized(Blackhole blackhole) {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true");
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
materialize(df);
materialize(df, blackhole);
});
}

@Benchmark
@Threads(1)
public void readIcebergWithIsDeletedColumn() {
public void readIcebergWithIsDeletedColumnVectorized(Blackhole blackhole) {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true");
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = false");
materialize(df);
materialize(df, blackhole);
});
}

@Benchmark
@Threads(1)
public void readDeletedRows() {
public void readDeletedRowsVectorized(Blackhole blackhole) {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
tableProperties.put(PARQUET_VECTORIZATION_ENABLED, "true");
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = true");
materialize(df);
materialize(df, blackhole);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.data.vectorized;

import org.apache.iceberg.arrow.vectorized.VectorHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnVector;

class ColumnVectorBuilder {
private boolean[] isDeleted;
private int[] rowIdMapping;

public ColumnVectorBuilder withDeletedRows(int[] rowIdMappingArray, boolean[] isDeletedArray) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I feel we better make this a constructor and pass these arrays only once during the construction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to make the builder more generic so that it can also be used for creation of vectors without deletes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see now. Then it is fine.

this.rowIdMapping = rowIdMappingArray;
this.isDeleted = isDeletedArray;
return this;
}

public ColumnVector build(VectorHolder holder, int numRows) {
if (holder.isDummy()) {
if (holder instanceof VectorHolder.DeletedVectorHolder) {
return new DeletedColumnVector(Types.BooleanType.get(), isDeleted);
} else if (holder instanceof ConstantVectorHolder) {
return new ConstantColumnVector(Types.IntegerType.get(), numRows,
((ConstantVectorHolder<?>) holder).getConstant());
} else {
throw new IllegalStateException("Unknown dummy vector holder: " + holder);
}
} else if (rowIdMapping != null) {
return new ColumnVectorWithFilter(holder, rowIdMapping);
} else {
return new IcebergArrowColumnVector(holder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
package org.apache.iceberg.spark.data.vectorized;

import org.apache.iceberg.arrow.vectorized.VectorHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -96,10 +93,4 @@ public byte[] getBinary(int rowId) {
}
return accessor().getBinary(rowIdMapping[rowId]);
}

public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) {
return holder.isDummy() ?
new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) :
new ColumnVectorWithFilter(holder, rowIdMapping);
}
}
Loading