diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 19bcdd672157..b758e9b2c09f 100644 --- a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; @@ -196,4 +197,8 @@ protected void withTableProperties(Map props, Action action) { restoreProperties.commit(); } } + + protected FileFormat fileFormat() { + throw new UnsupportedOperationException("Unsupported file format"); + } } diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java new file mode 100644 index 000000000000..d970175d651f --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.ClusteredEqualityDeleteWriter; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class IcebergSourceDeleteBenchmark extends IcebergSourceBenchmark { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class); + private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024; + + protected static final int NUM_FILES = 1; + protected static final int NUM_ROWS = 10 * 1000 * 1000; + + @Setup + public void setupBenchmark() throws IOException { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties( + tableProperties, + () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readIcebergVectorized() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true"); + withTableProperties( + tableProperties, + () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + protected abstract void appendData() throws IOException; + + protected void writeData(int fileNum) { + Dataset df = + spark() + .range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + } + + @Override + protected Table initTable() { + Schema schema = + new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + properties.put(TableProperties.FORMAT_VERSION, "2"); + return tables.create(schema, partitionSpec, properties, newTableLocation()); + } + + @Override + protected Configuration initHadoopConf() { + return new Configuration(); + } + + protected void writePosDeletes(CharSequence path, long numRows, double percentage) + throws IOException { + writePosDeletes(path, numRows, percentage, 1); + } + + protected void writePosDeletes( + CharSequence path, long numRows, double percentage, int numDeleteFile) throws IOException { + writePosDeletesWithNoise(path, numRows, percentage, 0, numDeleteFile); + } + + protected void writePosDeletesWithNoise( + CharSequence path, long numRows, double percentage, int numNoise, int numDeleteFile) + throws IOException { + Set deletedPos = Sets.newHashSet(); + while (deletedPos.size() < numRows * percentage) { + deletedPos.add(ThreadLocalRandom.current().nextLong(numRows)); + } + LOG.info("pos delete row count: {}, num of delete files: {}", deletedPos.size(), numDeleteFile); + + int partitionSize = (int) (numRows * percentage) / numDeleteFile; + Iterable> sets = Iterables.partition(deletedPos, partitionSize); + for (List item : sets) { + writePosDeletes(path, item, numNoise); + } + } + + protected void writePosDeletes(CharSequence path, List deletedPos, int numNoise) + throws IOException { + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = + SparkFileWriterFactory.builderFor(table()).dataFileFormat(fileFormat()).build(); + + ClusteredPositionDeleteWriter writer = + new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table().io(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionSpec unpartitionedSpec = table().specs().get(0); + + PositionDelete positionDelete = PositionDelete.create(); + try (ClusteredPositionDeleteWriter closeableWriter = writer) { + for (Long pos : deletedPos) { + positionDelete.set(path, pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + for (int i = 0; i < numNoise; i++) { + positionDelete.set(noisePath(path), pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + } + } + } + + RowDelta rowDelta = table().newRowDelta(); + writer.result().deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.validateDeletedFiles().commit(); + } + + protected void writeEqDeletes(long numRows, double percentage) throws IOException { + Set deletedValues = Sets.newHashSet(); + while (deletedValues.size() < numRows * percentage) { + deletedValues.add(ThreadLocalRandom.current().nextLong(numRows)); + } + + List rows = Lists.newArrayList(); + for (Long value : deletedValues) { + GenericInternalRow genericInternalRow = new GenericInternalRow(7); + genericInternalRow.setLong(0, value); + genericInternalRow.setInt(1, (int) (value % Integer.MAX_VALUE)); + genericInternalRow.setFloat(2, (float) value); + genericInternalRow.setNullAt(3); + genericInternalRow.setNullAt(4); + genericInternalRow.setNullAt(5); + genericInternalRow.setNullAt(6); + rows.add(genericInternalRow); + } + LOG.info("Num of equality deleted rows: {}", rows.size()); + + writeEqDeletes(rows); + } + + private void writeEqDeletes(List rows) throws IOException { + int equalityFieldId = table().schema().findField("longCol").fieldId(); + + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = + SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .equalityDeleteRowSchema(table().schema()) + .equalityFieldIds(new int[] {equalityFieldId}) + .build(); + + ClusteredEqualityDeleteWriter writer = + new ClusteredEqualityDeleteWriter<>( + writerFactory, fileFactory, table().io(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionSpec unpartitionedSpec = table().specs().get(0); + try (ClusteredEqualityDeleteWriter closeableWriter = writer) { + for (InternalRow row : rows) { + closeableWriter.write(row, unpartitionedSpec, null); + } + } + + RowDelta rowDelta = table().newRowDelta(); + LOG.info("Num of Delete File: {}", writer.result().deleteFiles().size()); + writer.result().deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.validateDeletedFiles().commit(); + } + + private OutputFileFactory newFileFactory() { + return OutputFileFactory.builderFor(table(), 1, 1).format(fileFormat()).build(); + } + + private CharSequence noisePath(CharSequence path) { + // assume the data file name would be something like + // "00000-0-30da64e0-56b5-4743-a11b-3188a1695bf7-00001.parquet" + // so the dataFileSuffixLen is the UUID string length + length of "-00001.parquet", which is 36 + // + 14 = 60. It's OK + // to be not accurate here. + int dataFileSuffixLen = 60; + UUID uuid = UUID.randomUUID(); + if (path.length() > dataFileSuffixLen) { + return path.subSequence(0, dataFileSuffixLen) + uuid.toString(); + } else { + return uuid.toString(); + } + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java new file mode 100644 index 000000000000..20a47d327e6d --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with equality delete in + * the Spark data source for Iceberg. + * + *

This class uses a dataset with a flat schema. To run this benchmark for spark-3.2: + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetEqDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-eq-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetEqDeleteBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) + private double percentDeleteRow; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + if (percentDeleteRow > 0) { + // add equality deletes + table().refresh(); + writeEqDeletes(NUM_ROWS, percentDeleteRow); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java new file mode 100644 index 000000000000..c4a15ee4cbaf --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the + * Spark data source for Iceberg. + * + *

This class uses a dataset with a flat schema. To run this benchmark for spark-3.2: + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetMultiDeleteFileBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-multi-delete-file-benchmark-result.txt + * + */ +public class IcebergSourceParquetMultiDeleteFileBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"1", "2", "5", "10"}) + private int numDeleteFile; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, 0.25, numDeleteFile); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java new file mode 100644 index 000000000000..4852f3bd547f --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the + * Spark data source for Iceberg. + * + *

This class uses a dataset with a flat schema. To run this benchmark for spark-3.2: + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetPosDeleteBenchmark extends IcebergSourceDeleteBenchmark { + @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"}) + private double percentDeleteRow; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + if (percentDeleteRow > 0) { + // add pos-deletes + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletes(file.path(), NUM_ROWS, percentDeleteRow); + } + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java new file mode 100644 index 000000000000..9d7271945a46 --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark; +import org.openjdk.jmh.annotations.Param; + +/** + * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the + * Spark data source for Iceberg. + * + *

This class uses a dataset with a flat schema. To run this benchmark for spark-3.2: + * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh + * -PjmhIncludeRegex=IcebergSourceParquetWithUnrelatedDeleteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-parquet-with-unrelated-delete-benchmark-result.txt + * + */ +public class IcebergSourceParquetWithUnrelatedDeleteBenchmark extends IcebergSourceDeleteBenchmark { + private static final double PERCENT_DELETE_ROW = 0.05; + + @Param({"0", "0.05", "0.25", "0.5"}) + private double percentUnrelatedDeletes; + + @Override + protected void appendData() throws IOException { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + writeData(fileNum); + + table().refresh(); + for (DataFile file : table().currentSnapshot().addedFiles()) { + writePosDeletesWithNoise( + file.path(), + NUM_ROWS, + PERCENT_DELETE_ROW, + (int) (percentUnrelatedDeletes / PERCENT_DELETE_ROW), + 1); + } + } + } + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java index a5a58922c933..2bfd0aaf8da7 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -90,7 +90,6 @@ import org.apache.spark.sql.types.IntegerType; import org.apache.spark.sql.types.LongType; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; import scala.Option; import scala.Predef; import scala.Some; @@ -508,48 +507,6 @@ public static String describe(org.apache.iceberg.SortOrder order) { return Joiner.on(", ").join(SortOrderVisitor.visit(order, DescribeSortOrderVisitor.INSTANCE)); } - public static Long propertyAsLong( - CaseInsensitiveStringMap options, String property, Long defaultValue) { - if (defaultValue != null) { - return options.getLong(property, defaultValue); - } - - String value = options.get(property); - if (value != null) { - return Long.parseLong(value); - } - - return null; - } - - public static Integer propertyAsInt( - CaseInsensitiveStringMap options, String property, Integer defaultValue) { - if (defaultValue != null) { - return options.getInt(property, defaultValue); - } - - String value = options.get(property); - if (value != null) { - return Integer.parseInt(value); - } - - return null; - } - - public static Boolean propertyAsBoolean( - CaseInsensitiveStringMap options, String property, Boolean defaultValue) { - if (defaultValue != null) { - return options.getBoolean(property, defaultValue); - } - - String value = options.get(property); - if (value != null) { - return Boolean.parseBoolean(value); - } - - return null; - } - public static class DescribeSchemaVisitor extends TypeUtil.SchemaVisitor { private static final Joiner COMMA = Joiner.on(','); private static final DescribeSchemaVisitor INSTANCE = new DescribeSchemaVisitor(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index 33e5ca936800..e1425042bdbb 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -67,6 +67,11 @@ public BooleanConfParser defaultValue(boolean value) { return self(); } + public BooleanConfParser defaultValue(String value) { + this.defaultValue = Boolean.parseBoolean(value); + return self(); + } + public boolean parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Boolean::parseBoolean, defaultValue); @@ -90,6 +95,10 @@ public int parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Integer::parseInt, defaultValue); } + + public Integer parseOptional() { + return parse(Integer::parseInt, null); + } } class LongConfParser extends ConfParser { @@ -132,6 +141,10 @@ public String parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Function.identity(), defaultValue); } + + public String parseOptional() { + return parse(Function.identity(), null); + } } abstract class ConfParser { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 937c31e45960..184c5ac168d5 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -50,16 +50,22 @@ public class SparkReadConf { private static final Set LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs"); + private final SparkSession spark; private final Table table; private final Map readOptions; private final SparkConfParser confParser; public SparkReadConf(SparkSession spark, Table table, Map readOptions) { + this.spark = spark; this.table = table; this.readOptions = readOptions; this.confParser = new SparkConfParser(spark, table, readOptions); } + public boolean caseSensitive() { + return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); + } + public boolean localityEnabled() { InputFile file = table.io().newInputFile(table.location()); @@ -88,6 +94,10 @@ public Long endSnapshotId() { return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); } + public String fileScanTaskSetId() { + return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional(); + } + public boolean streamingSkipDeleteSnapshots() { return confParser .booleanConf() @@ -142,6 +152,18 @@ public int orcBatchSize() { .parse(); } + public Long splitSizeOption() { + return confParser.longConf().option(SparkReadOptions.SPLIT_SIZE).parseOptional(); + } + + public Integer splitLookbackOption() { + return confParser.intConf().option(SparkReadOptions.LOOKBACK).parseOptional(); + } + + public Long splitOpenFileCostOption() { + return confParser.longConf().option(SparkReadOptions.FILE_OPEN_COST).parseOptional(); + } + public long splitSize() { return confParser .longConf() diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java index 653987e654aa..d0a745591163 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java @@ -22,8 +22,11 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -305,4 +308,20 @@ public static long estimateSize(StructType tableSchema, long totalRecords) { } return result; } + + public static void validateMetadataColumnReferences(Schema tableSchema, Schema readSchema) { + List conflictingColumnNames = + readSchema.columns().stream() + .map(Types.NestedField::name) + .filter( + name -> + MetadataColumns.isMetadataColumn(name) && tableSchema.findField(name) != null) + .collect(Collectors.toList()); + + ValidationException.check( + conflictingColumnNames.isEmpty(), + "Table column names conflict with names reserved for Iceberg metadata columns: %s.\n" + + "Please, use ALTER TABLE statements to rename the conflicting table columns.", + conflictingColumnNames); + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 08b3fbee7590..d42c23bc3b24 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -99,10 +99,18 @@ public boolean handleTimestampWithoutZone() { } public String overwriteMode() { - String overwriteMode = writeOptions.get("overwrite-mode"); + String overwriteMode = writeOptions.get(SparkWriteOptions.OVERWRITE_MODE); return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : null; } + public boolean wapEnabled() { + return confParser + .booleanConf() + .tableProperty(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED) + .defaultValue(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT) + .parse(); + } + public String wapId() { return sessionConf.get("spark.wap.id", null); } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 0ba435ae7429..ef25f871aed7 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -50,4 +50,6 @@ private SparkWriteOptions() {} // Controls whether to allow writing timestamps without zone info public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE = "handle-timestamp-without-timezone"; + + public static final String OVERWRITE_MODE = "overwrite-mode"; } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java index a79f075ef442..72b6268026ab 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java @@ -60,9 +60,9 @@ import org.slf4j.LoggerFactory; /** - * An action that removes orphan metadata and data files by listing a given location and comparing - * the actual files in that location with data and metadata files referenced by all valid snapshots. - * The location must be accessible for listing via the Hadoop {@link FileSystem}. + * An action that removes orphan metadata, data and delete files by listing a given location and + * comparing the actual files in that location with content and metadata files referenced by all + * valid snapshots. The location must be accessible for listing via the Hadoop {@link FileSystem}. * *

By default, this action cleans up the table location returned by {@link Table#location()} and * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can @@ -169,7 +169,7 @@ private String jobDesc() { } private DeleteOrphanFiles.Result doExecute() { - Dataset validDataFileDF = buildValidDataFileDF(table); + Dataset validDataFileDF = buildValidContentFileDF(table); Dataset validMetadataFileDF = buildValidMetadataFileDF(table); Dataset validFileDF = validDataFileDF.union(validMetadataFileDF); Dataset actualFileDF = buildActualFileDF(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java index 1431ae5d78ec..a1bc19d7dcc0 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java @@ -60,7 +60,7 @@ public class BaseDeleteReachableFilesSparkAction private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class); - private static final String DATA_FILE = "Data File"; + private static final String CONTENT_FILE = "Content File"; private static final String MANIFEST = "Manifest"; private static final String MANIFEST_LIST = "Manifest List"; private static final String OTHERS = "Others"; @@ -140,7 +140,7 @@ private Dataset projectFilePathWithType(Dataset ds, String type) { private Dataset buildValidFileDF(TableMetadata metadata) { Table staticTable = newStaticTable(metadata, io); - return projectFilePathWithType(buildValidDataFileDF(staticTable), DATA_FILE) + return projectFilePathWithType(buildValidContentFileDF(staticTable), CONTENT_FILE) .union(projectFilePathWithType(buildManifestFileDF(staticTable), MANIFEST)) .union(projectFilePathWithType(buildManifestListDF(staticTable), MANIFEST_LIST)) .union(projectFilePathWithType(buildOtherMetadataFileDF(staticTable), OTHERS)); @@ -183,9 +183,9 @@ private BaseDeleteReachableFilesActionResult deleteFiles(Iterator deleted) String type = fileInfo.getString(1); removeFunc.accept(file); switch (type) { - case DATA_FILE: + case CONTENT_FILE: dataFileCount.incrementAndGet(); - LOG.trace("Deleted Data File: {}", file); + LOG.trace("Deleted Content File: {}", file); break; case MANIFEST: manifestCount.incrementAndGet(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java index 2e1f0c079eca..da9907fe325a 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java @@ -57,7 +57,7 @@ * *

This action first leverages {@link org.apache.iceberg.ExpireSnapshots} to expire snapshots and * then uses metadata tables to find files that can be safely deleted. This is done by anti-joining - * two Datasets that contain all manifest and data files before and after the expiration. The + * two Datasets that contain all manifest and content files before and after the expiration. The * snapshot expiration will be fully committed before any deletes are issued. * *

This operation performs a shuffle so the parallelism can be controlled through @@ -72,7 +72,7 @@ public class BaseExpireSnapshotsSparkAction public static final String STREAM_RESULTS = "stream-results"; - private static final String DATA_FILE = "Data File"; + private static final String CONTENT_FILE = "Content File"; private static final String MANIFEST = "Manifest"; private static final String MANIFEST_LIST = "Manifest List"; @@ -233,7 +233,7 @@ private Dataset appendTypeString(Dataset ds, String type) { private Dataset buildValidFileDF(TableMetadata metadata) { Table staticTable = newStaticTable(metadata, this.table.io()); - return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE) + return appendTypeString(buildValidContentFileDF(staticTable), CONTENT_FILE) .union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST)) .union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST)); } @@ -266,9 +266,9 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator expired) { String type = fileInfo.getString(1); deleteFunc.accept(file); switch (type) { - case DATA_FILE: + case CONTENT_FILE: dataFileCount.incrementAndGet(); - LOG.trace("Deleted Data File: {}", file); + LOG.trace("Deleted Content File: {}", file); break; case MANIFEST: manifestCount.incrementAndGet(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index c9d93ce9de5f..5abfcc4482a4 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -110,7 +110,8 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) { return new BaseTable(ops, metadataFileLocation); } - protected Dataset buildValidDataFileDF(Table table) { + // builds a DF of delete and data file locations by reading all manifests + protected Dataset buildValidContentFileDF(Table table) { JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); Broadcast ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table)); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java new file mode 100644 index 000000000000..2372d6fddd4a --- /dev/null +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.arrow.vectorized.VectorHolder; +import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.unsafe.types.UTF8String; + +public class ColumnVectorWithFilter extends IcebergArrowColumnVector { + private final int[] rowIdMapping; + + public ColumnVectorWithFilter(VectorHolder holder, int[] rowIdMapping) { + super(holder); + this.rowIdMapping = rowIdMapping; + } + + @Override + public boolean isNullAt(int rowId) { + return nullabilityHolder().isNullAt(rowIdMapping[rowId]) == 1; + } + + @Override + public boolean getBoolean(int rowId) { + return accessor().getBoolean(rowIdMapping[rowId]); + } + + @Override + public int getInt(int rowId) { + return accessor().getInt(rowIdMapping[rowId]); + } + + @Override + public long getLong(int rowId) { + return accessor().getLong(rowIdMapping[rowId]); + } + + @Override + public float getFloat(int rowId) { + return accessor().getFloat(rowIdMapping[rowId]); + } + + @Override + public double getDouble(int rowId) { + return accessor().getDouble(rowIdMapping[rowId]); + } + + @Override + public ColumnarArray getArray(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getArray(rowIdMapping[rowId]); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getDecimal(rowIdMapping[rowId], precision, scale); + } + + @Override + public UTF8String getUTF8String(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getUTF8String(rowIdMapping[rowId]); + } + + @Override + public byte[] getBinary(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor().getBinary(rowIdMapping[rowId]); + } + + public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) { + return holder.isDummy() + ? new ConstantColumnVector( + Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) + : new ColumnVectorWithFilter(holder, rowIdMapping); + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f761b2eb551b..a7ef08dd3ba8 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -18,11 +18,20 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.iceberg.arrow.vectorized.BaseBatchReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -32,33 +41,177 @@ * populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { + private DeleteFilter deletes = null; + private long rowStartPosInBatch = 0; public ColumnarBatchReader(List> readers) { super(readers); } @Override - public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { - Preconditions.checkArgument( - numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); - ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length]; + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData, long rowPosition) { + super.setRowGroupInfo(pageStore, metaData, rowPosition); + this.rowStartPosInBatch = rowPosition; + } + public void setDeleteFilter(DeleteFilter deleteFilter) { + this.deletes = deleteFilter; + } + + @Override + public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { if (reuse == null) { closeVectors(); } - for (int i = 0; i < readers.length; i += 1) { - vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); - int numRowsInVector = vectorHolders[i].numValues(); - Preconditions.checkState( - numRowsInVector == numRowsToRead, - "Number of rows in the vector %s didn't match expected %s ", - numRowsInVector, - numRowsToRead); - arrowColumnVectors[i] = IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + ColumnBatchLoader batchLoader = new ColumnBatchLoader(numRowsToRead); + rowStartPosInBatch += numRowsToRead; + return batchLoader.columnarBatch; + } + + private class ColumnBatchLoader { + private int[] + rowIdMapping; // the rowId mapping to skip deleted rows for all column vectors inside a + // batch + private int numRows; + private ColumnarBatch columnarBatch; + + ColumnBatchLoader(int numRowsToRead) { + initRowIdMapping(numRowsToRead); + loadDataToColumnBatch(numRowsToRead); + } + + ColumnarBatch loadDataToColumnBatch(int numRowsToRead) { + Preconditions.checkArgument( + numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); + ColumnVector[] arrowColumnVectors = readDataToColumnVectors(numRowsToRead); + + columnarBatch = new ColumnarBatch(arrowColumnVectors); + columnarBatch.setNumRows(numRows); + + if (hasEqDeletes()) { + applyEqDelete(); + } + return columnarBatch; + } + + ColumnVector[] readDataToColumnVectors(int numRowsToRead) { + ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length]; + + for (int i = 0; i < readers.length; i += 1) { + vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); + int numRowsInVector = vectorHolders[i].numValues(); + Preconditions.checkState( + numRowsInVector == numRowsToRead, + "Number of rows in the vector %s didn't match expected %s ", + numRowsInVector, + numRowsToRead); + + arrowColumnVectors[i] = + hasDeletes() + ? ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMapping, numRows) + : IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector); + } + return arrowColumnVectors; + } + + boolean hasDeletes() { + return rowIdMapping != null; + } + + boolean hasEqDeletes() { + return deletes != null && deletes.hasEqDeletes(); + } + + void initRowIdMapping(int numRowsToRead) { + Pair posDeleteRowIdMapping = posDelRowIdMapping(numRowsToRead); + if (posDeleteRowIdMapping != null) { + rowIdMapping = posDeleteRowIdMapping.first(); + numRows = posDeleteRowIdMapping.second(); + } else { + numRows = numRowsToRead; + rowIdMapping = initEqDeleteRowIdMapping(numRowsToRead); + } + } + + Pair posDelRowIdMapping(int numRowsToRead) { + if (deletes != null && deletes.hasPosDeletes()) { + return buildPosDelRowIdMapping(deletes.deletedRowPositions(), numRowsToRead); + } else { + return null; + } + } + + /** + * Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we + * delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the + * row id mapping array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position + * deletes [Set Num records to 6] + * + * @param deletedRowPositions a set of deleted row positions + * @param numRowsToRead the num of rows + * @return the mapping array and the new num of rows in a batch, null if no row is deleted + */ + Pair buildPosDelRowIdMapping( + PositionDeleteIndex deletedRowPositions, int numRowsToRead) { + if (deletedRowPositions == null) { + return null; + } + + int[] posDelRowIdMapping = new int[numRowsToRead]; + int originalRowId = 0; + int currentRowId = 0; + while (originalRowId < numRowsToRead) { + if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { + posDelRowIdMapping[currentRowId] = originalRowId; + currentRowId++; + } + originalRowId++; + } + + if (currentRowId == numRowsToRead) { + // there is no delete in this batch + return null; + } else { + return Pair.of(posDelRowIdMapping, currentRowId); + } + } + + int[] initEqDeleteRowIdMapping(int numRowsToRead) { + int[] eqDeleteRowIdMapping = null; + if (hasEqDeletes()) { + eqDeleteRowIdMapping = new int[numRowsToRead]; + for (int i = 0; i < numRowsToRead; i++) { + eqDeleteRowIdMapping[i] = i; + } + } + return eqDeleteRowIdMapping; + } + + /** + * Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original + * status of the row id mapping array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying + * position deletes [Set Num records to 6] Equality delete 1 <= x <= 3 [0,4,5,7,-,-,-,-] -- + * After applying equality deletes [Set Num records to 4] + */ + void applyEqDelete() { + Iterator it = columnarBatch.rowIterator(); + int rowId = 0; + int currentRowId = 0; + while (it.hasNext()) { + InternalRow row = it.next(); + if (deletes.eqDeletedRowFilter().test(row)) { + // the row is NOT deleted + // skip deleted rows by pointing to the next undeleted row Id + rowIdMapping[currentRowId] = rowIdMapping[rowId]; + currentRowId++; + } + + rowId++; + } + + columnarBatch.setNumRows(currentRowId); } - ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors); - batch.setNumRows(numRowsToRead); - return batch; } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java index 33c1a5284818..26a12c239f86 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java @@ -48,6 +48,14 @@ public IcebergArrowColumnVector(VectorHolder holder) { this.accessor = ArrowVectorAccessors.getVectorAccessor(holder); } + protected ArrowVectorAccessor accessor() { + return accessor; + } + + protected NullabilityHolder nullabilityHolder() { + return nullabilityHolder; + } + @Override public void close() { accessor.close(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index bbb63e077bc6..bf85bdb7ed05 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -18,12 +18,17 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.catalyst.InternalRow; public class VectorizedSparkParquetReaders { @@ -50,4 +55,47 @@ public static ColumnarBatchReader buildReader( idToConstant, ColumnarBatchReader::new)); } + + public static ColumnarBatchReader buildReader( + Schema expectedSchema, + MessageType fileSchema, + boolean setArrowValidityVector, + Map idToConstant, + DeleteFilter deleteFilter) { + return (ColumnarBatchReader) + TypeWithSchemaVisitor.visit( + expectedSchema.asStruct(), + fileSchema, + new ReaderBuilder( + expectedSchema, + fileSchema, + setArrowValidityVector, + idToConstant, + ColumnarBatchReader::new, + deleteFilter)); + } + + private static class ReaderBuilder extends VectorizedReaderBuilder { + private final DeleteFilter deleteFilter; + + ReaderBuilder( + Schema expectedSchema, + MessageType parquetSchema, + boolean setArrowValidityVector, + Map idToConstant, + Function>, VectorizedReader> readerFactory, + DeleteFilter deleteFilter) { + super(expectedSchema, parquetSchema, setArrowValidityVector, idToConstant, readerFactory); + this.deleteFilter = deleteFilter; + } + + @Override + protected VectorizedReader vectorizedReader(List> reorderedFields) { + VectorizedReader reader = super.vectorizedReader(reorderedFields); + if (deleteFilter != null) { + ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter); + } + return reader; + } + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java index 2cab8ee238e0..f3ddd50eef4b 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java @@ -96,6 +96,10 @@ abstract class BaseDataReader implements Closeable { this.currentIterator = CloseableIterator.empty(); } + protected Table table() { + return table; + } + public boolean next() throws IOException { try { while (true) { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index d620faa979f6..ad17ba52628d 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -27,8 +27,10 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; @@ -37,10 +39,12 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; class BatchDataReader extends BaseDataReader { @@ -71,17 +75,24 @@ CloseableIterator open(FileScanTask task) { InputFile location = getInputFile(task); Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); if (task.file().format() == FileFormat.PARQUET) { + SparkDeleteFilter deleteFilter = deleteFilter(task); + // get required schema for filtering out equality-delete rows in case equality-delete uses + // columns are + // not selected. + Schema requiredSchema = requiredSchema(deleteFilter); + Parquet.ReadBuilder builder = Parquet.read(location) - .project(expectedSchema) + .project(requiredSchema) .split(task.start(), task.length()) .createBatchedReaderFunc( fileSchema -> VectorizedSparkParquetReaders.buildReader( - expectedSchema, + requiredSchema, fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, - idToConstant)) + idToConstant, + deleteFilter)) .recordsPerBatch(batchSize) .filter(task.residual()) .caseSensitive(caseSensitive) @@ -127,4 +138,37 @@ CloseableIterator open(FileScanTask task) { } return iter.iterator(); } + + private SparkDeleteFilter deleteFilter(FileScanTask task) { + return task.deletes().isEmpty() + ? null + : new SparkDeleteFilter(task, table().schema(), expectedSchema); + } + + private Schema requiredSchema(DeleteFilter deleteFilter) { + if (deleteFilter != null && deleteFilter.hasEqDeletes()) { + return deleteFilter.requiredSchema(); + } else { + return expectedSchema; + } + } + + private class SparkDeleteFilter extends DeleteFilter { + private final InternalRowWrapper asStructLike; + + SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema); + this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); + } + + @Override + protected StructLike asStructLike(InternalRow row) { + return asStructLike.wrap(row); + } + + @Override + protected InputFile getInputFile(String location) { + return BatchDataReader.this.getInputFile(location); + } + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 4fcab5517d44..5fed852f678a 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -30,11 +30,9 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; class SparkBatchQueryScan extends SparkBatchScan { @@ -52,12 +50,10 @@ class SparkBatchQueryScan extends SparkBatchScan { SparkSession spark, Table table, SparkReadConf readConf, - boolean caseSensitive, Schema expectedSchema, - List filters, - CaseInsensitiveStringMap options) { + List filters) { - super(spark, table, readConf, caseSensitive, expectedSchema, filters, options); + super(spark, table, readConf, expectedSchema, filters); this.snapshotId = readConf.snapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); @@ -83,11 +79,9 @@ class SparkBatchQueryScan extends SparkBatchScan { "Cannot only specify option end-snapshot-id to do incremental scan"); } - // look for split behavior overrides in options - this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null); - this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null); - this.splitOpenFileCost = - Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null); + this.splitSize = readConf.splitSizeOption(); + this.splitLookback = readConf.splitLookbackOption(); + this.splitOpenFileCost = readConf.splitOpenFileCostOption(); } @Override diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 3b3d62d96226..541e4a03094f 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -55,7 +55,6 @@ import org.apache.spark.sql.connector.read.SupportsReportStatistics; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +70,6 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Schema expectedSchema; private final List filterExpressions; private final boolean readTimestampWithoutZone; - private final CaseInsensitiveStringMap options; // lazy variables private StructType readSchema = null; @@ -80,19 +78,19 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { SparkSession spark, Table table, SparkReadConf readConf, - boolean caseSensitive, Schema expectedSchema, - List filters, - CaseInsensitiveStringMap options) { + List filters) { + + SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema); + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.readConf = readConf; - this.caseSensitive = caseSensitive; + this.caseSensitive = readConf.caseSensitive(); this.expectedSchema = expectedSchema; this.filterExpressions = filters != null ? filters : Collections.emptyList(); this.localityPreferred = readConf.localityEnabled(); this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone(); - this.options = options; } protected Table table() { @@ -193,11 +191,11 @@ public PartitionReaderFactory createReaderFactory() { boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks); - boolean readUsingBatch = - batchReadsEnabled - && hasNoDeleteFiles - && (allOrcFileScanTasks - || (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + boolean batchReadOrc = hasNoDeleteFiles && allOrcFileScanTasks; + + boolean batchReadParquet = allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives; + + boolean readUsingBatch = batchReadsEnabled && (batchReadOrc || batchReadParquet); int batchSize = readUsingBatch ? batchSize(allParquetFileScanTasks, allOrcFileScanTasks) : 0; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java index 4eb36b67ea40..1cf03e55c378 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java @@ -29,10 +29,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.FileScanTaskSetManager; import org.apache.iceberg.spark.SparkReadConf; -import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; class SparkFilesScan extends SparkBatchScan { private final String taskSetID; @@ -42,15 +40,10 @@ class SparkFilesScan extends SparkBatchScan { private List tasks = null; // lazy cache of tasks - SparkFilesScan( - SparkSession spark, - Table table, - SparkReadConf readConf, - boolean caseSensitive, - CaseInsensitiveStringMap options) { - super(spark, table, readConf, caseSensitive, table.schema(), ImmutableList.of(), options); + SparkFilesScan(SparkSession spark, Table table, SparkReadConf readConf) { + super(spark, table, readConf, table.schema(), ImmutableList.of()); - this.taskSetID = options.get(SparkReadOptions.FILE_SCAN_TASK_SET_ID); + this.taskSetID = readConf.fileScanTaskSetId(); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java index 029585caf944..03ab3aa062d3 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java @@ -30,19 +30,15 @@ class SparkFilesScanBuilder implements ScanBuilder { private final SparkSession spark; private final Table table; private final SparkReadConf readConf; - private final boolean caseSensitive; - private final CaseInsensitiveStringMap options; SparkFilesScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; this.readConf = new SparkReadConf(spark, table, options); - this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); - this.options = options; } @Override public Scan build() { - return new SparkFilesScan(spark, table, readConf, caseSensitive, options); + return new SparkFilesScan(spark, table, readConf); } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java index 8bc3f7d049cf..e43ff44519bc 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java @@ -36,12 +36,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; -import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter; import org.apache.spark.sql.connector.read.Statistics; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter { @@ -62,13 +60,11 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter { SparkSession spark, Table table, SparkReadConf readConf, - boolean caseSensitive, boolean ignoreResiduals, Schema expectedSchema, - List filters, - CaseInsensitiveStringMap options) { + List filters) { - super(spark, table, readConf, caseSensitive, expectedSchema, filters, options); + super(spark, table, readConf, expectedSchema, filters); this.table = table; this.ignoreResiduals = ignoreResiduals; @@ -77,8 +73,7 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter { this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost(); - Preconditions.checkArgument( - !options.containsKey(SparkReadOptions.SNAPSHOT_ID), "Can't set snapshot-id in options"); + Preconditions.checkArgument(readConf.snapshotId() == null, "Can't set snapshot-id in options"); Snapshot currentSnapshot = table.currentSnapshot(); this.snapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java new file mode 100644 index 000000000000..94f87c28741d --- /dev/null +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.spark.sql.connector.catalog.MetadataColumn; +import org.apache.spark.sql.types.DataType; + +public class SparkMetadataColumn implements MetadataColumn { + + private final String name; + private final DataType dataType; + private final boolean isNullable; + + public SparkMetadataColumn(String name, DataType dataType, boolean isNullable) { + this.name = name; + this.dataType = dataType; + this.isNullable = isNullable; + } + + @Override + public String name() { + return name; + } + + @Override + public DataType dataType() { + return dataType; + } + + @Override + public boolean isNullable() { + return isNullable; + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 708d4378bc1b..f0ecdb8f13d0 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -58,7 +58,6 @@ public class SparkScanBuilder private final SparkSession spark; private final Table table; private final SparkReadConf readConf; - private final CaseInsensitiveStringMap options; private final List metaColumns = Lists.newArrayList(); private Schema schema = null; @@ -74,8 +73,7 @@ public class SparkScanBuilder this.table = table; this.schema = schema; this.readConf = new SparkReadConf(spark, table, options); - this.options = options; - this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); + this.caseSensitive = readConf.caseSensitive(); } SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { @@ -171,25 +169,12 @@ private Schema schemaWithMetadataColumns() { @Override public Scan build() { return new SparkBatchQueryScan( - spark, - table, - readConf, - caseSensitive, - schemaWithMetadataColumns(), - filterExpressions, - options); + spark, table, readConf, schemaWithMetadataColumns(), filterExpressions); } public Scan buildMergeScan() { return new SparkMergeScan( - spark, - table, - readConf, - caseSensitive, - ignoreResiduals, - schemaWithMetadataColumns(), - filterExpressions, - options); + spark, table, readConf, ignoreResiduals, schemaWithMetadataColumns(), filterExpressions); } @Override diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 0f0c86fe89ee..de56696d0acc 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Partitioning; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -46,6 +48,8 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.MetadataColumn; +import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.TableCapability; @@ -57,6 +61,8 @@ import org.apache.spark.sql.connector.write.LogicalWriteInfo; import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.slf4j.Logger; @@ -67,7 +73,8 @@ public class SparkTable SupportsRead, SupportsWrite, ExtendedSupportsDelete, - SupportsMerge { + SupportsMerge, + SupportsMetadataColumns { private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class); @@ -167,6 +174,17 @@ public Set capabilities() { return CAPABILITIES; } + @Override + public MetadataColumn[] metadataColumns() { + DataType sparkPartitionType = SparkSchemaUtil.convert(Partitioning.partitionType(table())); + return new MetadataColumn[] { + new SparkMetadataColumn(MetadataColumns.SPEC_ID.name(), DataTypes.IntegerType, false), + new SparkMetadataColumn(MetadataColumns.PARTITION_COLUMN_NAME, sparkPartitionType, true), + new SparkMetadataColumn(MetadataColumns.FILE_PATH.name(), DataTypes.StringType, false), + new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false) + }; + } + @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 3ba40bc88582..8d955bdd21e8 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -51,7 +51,6 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -98,6 +97,7 @@ class SparkWrite { private final String queryId; private final FileFormat format; private final String applicationId; + private final boolean wapEnabled; private final String wapId; private final long targetFileSize; private final Schema writeSchema; @@ -120,6 +120,7 @@ class SparkWrite { this.queryId = writeInfo.queryId(); this.format = writeConf.dataFileFormat(); this.applicationId = applicationId; + this.wapEnabled = writeConf.wapEnabled(); this.wapId = writeConf.wapId(); this.targetFileSize = writeConf.targetDataFileSize(); this.writeSchema = writeSchema; @@ -156,15 +157,6 @@ StreamingWrite asStreamingOverwrite() { return new StreamingOverwrite(); } - private boolean isWapTable() { - return Boolean.parseBoolean( - table - .properties() - .getOrDefault( - TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, - TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)); - } - // the writer factory works for both batch and streaming private WriterFactory createWriterFactory() { // broadcast the table metadata as the writer factory will be sent to executors @@ -188,7 +180,7 @@ private void commitOperation(SnapshotUpdate operation, String description) { CommitMetadata.commitProperties().forEach(operation::set); } - if (isWapTable() && wapId != null) { + if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 929d08f2cdb6..653ebcb01964 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -19,17 +19,22 @@ package org.apache.iceberg.spark.data; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.arrow.vector.NullCheckingForGet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; @@ -38,6 +43,8 @@ import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.Types; @@ -160,6 +167,69 @@ public void testReadRowNumbers() throws IOException { readAndValidate(null, null, null, EXPECTED_ROWS); } + @Test + public void testReadRowNumbersWithDelete() throws IOException { + if (vectorized) { + List expectedRowsAfterDelete = Lists.newArrayList(EXPECTED_ROWS); + // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and + // [100, 200) + for (int i = 1; i <= 5; i++) { + expectedRowsAfterDelete.remove(98); + } + + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA); + + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex(); + deletedRowPos.delete(98, 103); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + + builder.createBatchedReaderFunc( + fileSchema -> + VectorizedSparkParquetReaders.buildReader( + PROJECTION_SCHEMA, + fileSchema, + NullCheckingForGet.NULL_CHECKING_ENABLED, + Maps.newHashMap(), + deleteFilter)); + builder.recordsPerBatch(RECORDS_PER_BATCH); + + validate(expectedRowsAfterDelete, builder); + } + } + + private class CustomizedPositionDeleteIndex implements PositionDeleteIndex { + private final Set deleteIndex; + + private CustomizedPositionDeleteIndex() { + deleteIndex = Sets.newHashSet(); + } + + @Override + public void delete(long position) { + deleteIndex.add(position); + } + + @Override + public void delete(long posStart, long posEnd) { + for (long l = posStart; l < posEnd; l++) { + delete(l); + } + } + + @Override + public boolean isDeleted(long position) { + return deleteIndex.contains(position); + } + + @Override + public boolean isEmpty() { + return deleteIndex.isEmpty(); + } + } + @Test public void testReadRowNumbersWithFilter() throws IOException { // current iceberg supports row group filter. @@ -216,6 +286,11 @@ private void readAndValidate( builder = builder.split(splitStart, splitLength); } + validate(expected, builder); + } + + private void validate(List expected, Parquet.ReadBuilder builder) + throws IOException { try (CloseableIterable reader = vectorized ? batchesToRows(builder.build()) : builder.build()) { final Iterator actualRows = reader.iterator(); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java index f61545df79a0..14e3e381ff53 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.spark.source; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; @@ -30,14 +33,14 @@ public class TestSparkCatalog @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - String[] parts = ident.name().split("\\$", 2); - if (parts.length == 2) { - TestTables.TestTable table = TestTables.load(parts[0]); - String[] metadataColumns = parts[1].split(","); - return new SparkTestTable(table, metadataColumns, false); - } else { - TestTables.TestTable table = TestTables.load(ident.name()); - return new SparkTestTable(table, null, false); + TableIdentifier tableIdentifier = Spark3Util.identifierToTableIdentifier(ident); + Namespace namespace = tableIdentifier.namespace(); + + TestTables.TestTable table = TestTables.load(tableIdentifier.toString()); + if (table == null && namespace.equals(Namespace.of("default"))) { + table = TestTables.load(tableIdentifier.name()); } + + return new SparkTable(table, false); } } diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java index 5ee042f55e66..0df86a1ecb07 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -28,6 +28,7 @@ import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; @@ -120,8 +121,6 @@ public void dropTable() { TestTables.clearTables(); } - // TODO: remove testing workarounds once we compile against Spark 3.2 - @Test public void testSpecAndPartitionMetadataColumns() { // TODO: support metadata structs in vectorized ORC reads @@ -153,9 +152,7 @@ public void testSpecAndPartitionMetadataColumns() { assertEquals( "Rows must match", expected, - sql( - "SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", - TABLE_NAME)); + sql("SELECT _spec_id, _partition FROM %s ORDER BY _spec_id", TABLE_NAME)); } @Test @@ -169,7 +166,43 @@ public void testPartitionMetadataColumnWithUnknownTransforms() { "Should fail to query the partition metadata column", ValidationException.class, "Cannot build table partition type, unknown transforms", - () -> sql("SELECT _partition FROM `%s$_partition`", TABLE_NAME)); + () -> sql("SELECT _partition FROM %s", TABLE_NAME)); + } + + @Test + public void testConflictingColumns() { + table + .updateSchema() + .addColumn(MetadataColumns.SPEC_ID.name(), Types.IntegerType.get()) + .addColumn(MetadataColumns.FILE_PATH.name(), Types.StringType.get()) + .commit(); + + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1', -1, 'path/to/file')", TABLE_NAME); + + assertEquals( + "Rows must match", + ImmutableList.of(row(1L, "a1")), + sql("SELECT id, category FROM %s", TABLE_NAME)); + + AssertHelpers.assertThrows( + "Should fail to query conflicting columns", + ValidationException.class, + "column names conflict", + () -> sql("SELECT * FROM %s", TABLE_NAME)); + + table.refresh(); + + table + .updateSchema() + .renameColumn(MetadataColumns.SPEC_ID.name(), "_renamed" + MetadataColumns.SPEC_ID.name()) + .renameColumn( + MetadataColumns.FILE_PATH.name(), "_renamed" + MetadataColumns.FILE_PATH.name()) + .commit(); + + assertEquals( + "Rows must match", + ImmutableList.of(row(0, null, -1)), + sql("SELECT _spec_id, _partition, _renamed_spec_id FROM %s", TABLE_NAME)); } private void createAndInitTable() throws IOException { diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 462f34530725..575555d745c2 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -50,6 +50,8 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.Dataset; @@ -60,12 +62,25 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + private final boolean vectorized; + + public TestSparkReaderDeletes(boolean vectorized) { + this.vectorized = vectorized; + } + + @Parameterized.Parameters(name = "vectorized = {0}") + public static Object[][] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } @BeforeClass public static void startMetastoreAndSpark() { @@ -108,7 +123,15 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - + if (vectorized) { + table + .updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") + .set( + TableProperties.PARQUET_BATCH_SIZE, + "4") // split 7 records to two batches to cover more code paths + .commit(); + } return table; } @@ -242,4 +265,32 @@ public void testReadEqualityDeleteRows() throws IOException { Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); } + + @Test + public void testPosDeletesAllRowsInBatch() throws IOException { + // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all + // deleted. + List> deletes = + Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 1L), // id = 43 + Pair.of(dataFile.path(), 2L), // id = 61 + Pair.of(dataFile.path(), 3L) // id = 89 + ); + + Pair posDeletes = + FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89); + StructLikeSet actual = rowSet(tableName, table, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } }