diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index bf7bd8917956..6f16794d5bfd 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -50,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Conversions; @@ -75,6 +76,7 @@ class DeleteFileIndex { private final DeleteFileGroup globalDeletes; private final Map, DeleteFileGroup> deletesByPartition; private final boolean isEmpty; + private final boolean useColumnStatsFiltering; /** @deprecated since 1.4.0, will be removed in 1.5.0. */ @Deprecated @@ -83,13 +85,14 @@ class DeleteFileIndex { long[] globalSeqs, DeleteFile[] globalDeletes, Map, Pair> deletesByPartition) { - this(specs, index(specs, globalSeqs, globalDeletes), index(specs, deletesByPartition)); + this(specs, index(specs, globalSeqs, globalDeletes), index(specs, deletesByPartition), true); } private DeleteFileIndex( Map specs, DeleteFileGroup globalDeletes, - Map, DeleteFileGroup> deletesByPartition) { + Map, DeleteFileGroup> deletesByPartition, + boolean useColumnStatsFiltering) { ImmutableMap.Builder builder = ImmutableMap.builder(); specs.forEach((specId, spec) -> builder.put(specId, spec.partitionType())); this.partitionTypeById = builder.build(); @@ -97,6 +100,7 @@ private DeleteFileIndex( this.globalDeletes = globalDeletes; this.deletesByPartition = deletesByPartition; this.isEmpty = globalDeletes == null && deletesByPartition.isEmpty(); + this.useColumnStatsFiltering = useColumnStatsFiltering; } public boolean isEmpty() { @@ -148,7 +152,16 @@ DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { if (globalDeletes == null && partitionDeletes == null) { return NO_DELETES; + } else if (useColumnStatsFiltering) { + return limitWithColumnStatsFiltering(sequenceNumber, file, partitionDeletes); + } else { + return limitWithoutColumnStatsFiltering(sequenceNumber, partitionDeletes); } + } + + // limits deletes using sequence numbers and checks whether columns stats overlap + private DeleteFile[] limitWithColumnStatsFiltering( + long sequenceNumber, DataFile file, DeleteFileGroup partitionDeletes) { Stream matchingDeletes; if (partitionDeletes == null) { @@ -167,6 +180,21 @@ DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { .toArray(DeleteFile[]::new); } + // limits deletes using sequence numbers but skips expensive column stats filtering + private DeleteFile[] limitWithoutColumnStatsFiltering( + long sequenceNumber, DeleteFileGroup partitionDeletes) { + + if (partitionDeletes == null) { + return globalDeletes.filter(sequenceNumber); + } else if (globalDeletes == null) { + return partitionDeletes.filter(sequenceNumber); + } else { + DeleteFile[] matchingGlobalDeletes = globalDeletes.filter(sequenceNumber); + DeleteFile[] matchingPartitionDeletes = partitionDeletes.filter(sequenceNumber); + return ObjectArrays.concat(matchingGlobalDeletes, matchingPartitionDeletes, DeleteFile.class); + } + } + private static boolean canContainDeletesForFile(DataFile dataFile, IndexedDeleteFile deleteFile) { switch (deleteFile.content()) { case POSITION_DELETES: @@ -483,6 +511,8 @@ private Collection loadDeleteFiles() { DeleteFileIndex build() { Iterable files = deleteFiles != null ? filterDeleteFiles() : loadDeleteFiles(); + boolean useColumnStatsFiltering = false; + // build a map from (specId, partition) to delete file entries Map wrappersBySpecId = Maps.newHashMap(); ListMultimap, IndexedDeleteFile> deleteFilesByPartition = @@ -494,7 +524,13 @@ DeleteFileIndex build() { wrappersBySpecId .computeIfAbsent(specId, id -> StructLikeWrapper.forType(spec.partitionType())) .copyFor(file.partition()); - deleteFilesByPartition.put(Pair.of(specId, wrapper), new IndexedDeleteFile(spec, file)); + IndexedDeleteFile indexedFile = new IndexedDeleteFile(spec, file); + deleteFilesByPartition.put(Pair.of(specId, wrapper), indexedFile); + + if (!useColumnStatsFiltering) { + useColumnStatsFiltering = indexedFile.hasLowerAndUpperBounds(); + } + ScanMetricsUtil.indexedDeleteFile(scanMetrics, file); } @@ -535,7 +571,8 @@ DeleteFileIndex build() { } } - return new DeleteFileIndex(specsById, globalDeletes, sortedDeletesByPartition); + return new DeleteFileIndex( + specsById, globalDeletes, sortedDeletesByPartition, useColumnStatsFiltering); } private Iterable>> deleteManifestReaders() { @@ -597,7 +634,28 @@ private static class DeleteFileGroup { this.files = files; } + public DeleteFile[] filter(long seq) { + int start = findStartIndex(seq); + + if (start >= files.length) { + return NO_DELETES; + } + + DeleteFile[] matchingFiles = new DeleteFile[files.length - start]; + + for (int index = start; index < files.length; index++) { + matchingFiles[index - start] = files[index].wrapped(); + } + + return matchingFiles; + } + public Stream limit(long seq) { + int start = findStartIndex(seq); + return Arrays.stream(files, start, files.length); + } + + private int findStartIndex(long seq) { int pos = Arrays.binarySearch(seqs, seq); int start; if (pos < 0) { @@ -612,7 +670,7 @@ public Stream limit(long seq) { } } - return Arrays.stream(files, start, files.length); + return start; } public Iterable referencedDeleteFiles() { diff --git a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java new file mode 100644 index 000000000000..f73794acaf9e --- /dev/null +++ b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.spark.sql.functions.lit; + +import com.google.errorprone.annotations.FormatMethod; +import com.google.errorprone.annotations.FormatString; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; +import org.apache.iceberg.util.ThreadPools; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * A benchmark that evaluates the delete file index build and lookup performance. + * + *

To run this benchmark for spark-3.4: + * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh + * -PjmhIncludeRegex=DeleteFileIndexBenchmark + * -PjmhOutputPath=benchmark/iceberg-delete-file-index-benchmark.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 10) +@Timeout(time = 20, timeUnit = TimeUnit.MINUTES) +@BenchmarkMode(Mode.SingleShotTime) +public class DeleteFileIndexBenchmark { + + private static final String TABLE_NAME = "test_table"; + private static final String PARTITION_COLUMN = "ss_ticket_number"; + + private static final int NUM_PARTITIONS = 50; + private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25; + private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000; + private static final int NUM_DELETE_FILES_PER_PARTITION = 100; + private static final int NUM_ROWS_PER_DATA_FILE = 500; + + private final Configuration hadoopConf = new Configuration(); + private SparkSession spark; + private Table table; + + private List dataFiles; + + @Setup + public void setupBenchmark() throws NoSuchTableException, ParseException { + setupSpark(); + initTable(); + initDataAndDeletes(); + loadDataFiles(); + } + + @TearDown + public void tearDownBenchmark() { + dropTable(); + tearDownSpark(); + } + + @Benchmark + @Threads(1) + public void buildIndexAndLookup(Blackhole blackhole) { + DeleteFileIndex deletes = buildDeletes(); + for (DataFile dataFile : dataFiles) { + DeleteFile[] deleteFiles = deletes.forDataFile(dataFile.dataSequenceNumber(), dataFile); + blackhole.consume(deleteFiles); + } + } + + private void loadDataFiles() { + table.refresh(); + + Snapshot snapshot = table.currentSnapshot(); + + ManifestGroup manifestGroup = + new ManifestGroup(table.io(), snapshot.dataManifests(table.io()), ImmutableList.of()); + + try (CloseableIterable> entries = manifestGroup.entries()) { + List files = Lists.newArrayList(); + for (ManifestEntry entry : entries) { + files.add(entry.file().copyWithoutStats()); + } + this.dataFiles = files; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private DeleteFileIndex buildDeletes() { + table.refresh(); + + List deleteManifests = table.currentSnapshot().deleteManifests(table.io()); + + return DeleteFileIndex.builderFor(table.io(), deleteManifests) + .specsById(table.specs()) + .planWith(ThreadPools.getWorkerPool()) + .build(); + } + + private DataFile loadAddedDataFile() { + table.refresh(); + + Iterable addedDataFiles = table.currentSnapshot().addedDataFiles(table.io()); + return Iterables.getOnlyElement(addedDataFiles); + } + + private DeleteFile loadAddedDeleteFile() { + table.refresh(); + + Iterable addedDeleteFiles = table.currentSnapshot().addedDeleteFiles(table.io()); + return Iterables.getOnlyElement(addedDeleteFiles); + } + + private void initDataAndDeletes() throws NoSuchTableException { + Schema schema = table.schema(); + PartitionSpec spec = table.spec(); + LocationProvider locations = table.locationProvider(); + + for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) { + Dataset inputDF = + randomDataDF(schema, NUM_ROWS_PER_DATA_FILE) + .drop(PARTITION_COLUMN) + .withColumn(PARTITION_COLUMN, lit(partitionOrdinal)); + + for (int fileOrdinal = 0; fileOrdinal < NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) { + appendAsFile(inputDF); + } + + DataFile dataFile = loadAddedDataFile(); + + sql( + "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d", + TABLE_NAME, PARTITION_COLUMN, partitionOrdinal); + + DeleteFile deleteFile = loadAddedDeleteFile(); + + AppendFiles append = table.newFastAppend(); + + for (int fileOrdinal = 0; fileOrdinal < NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) { + String replicaFileName = UUID.randomUUID() + "-replica.parquet"; + DataFile replicaDataFile = + DataFiles.builder(spec) + .copy(dataFile) + .withPath(locations.newDataLocation(spec, dataFile.partition(), replicaFileName)) + .build(); + append.appendFile(replicaDataFile); + } + + append.commit(); + + RowDelta rowDelta = table.newRowDelta(); + + for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; fileOrdinal++) { + String replicaFileName = UUID.randomUUID() + "-replica.parquet"; + DeleteFile replicaDeleteFile = + FileMetadata.deleteFileBuilder(spec) + .copy(deleteFile) + .withPath(locations.newDataLocation(spec, deleteFile.partition(), replicaFileName)) + .build(); + rowDelta.addDeletes(replicaDeleteFile); + } + + rowDelta.commit(); + } + } + + private void appendAsFile(Dataset df) throws NoSuchTableException { + df.coalesce(1).writeTo(TABLE_NAME).append(); + } + + private Dataset randomDataDF(Schema schema, int numRows) { + Iterable rows = RandomData.generateSpark(schema, numRows, 0); + JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD rowRDD = context.parallelize(Lists.newArrayList(rows)); + StructType rowSparkType = SparkSchemaUtil.convert(schema); + return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false); + } + + private void setupSpark() { + this.spark = + SparkSession.builder() + .config("spark.ui.enabled", false) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName()) + .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir()) + .master("local[*]") + .getOrCreate(); + } + + private void tearDownSpark() { + spark.stop(); + } + + private void initTable() throws NoSuchTableException, ParseException { + sql( + "CREATE TABLE %s ( " + + " `ss_sold_date_sk` INT, " + + " `ss_sold_time_sk` INT, " + + " `ss_item_sk` INT, " + + " `ss_customer_sk` STRING, " + + " `ss_cdemo_sk` STRING, " + + " `ss_hdemo_sk` STRING, " + + " `ss_addr_sk` STRING, " + + " `ss_store_sk` STRING, " + + " `ss_promo_sk` STRING, " + + " `ss_ticket_number` INT, " + + " `ss_quantity` STRING, " + + " `ss_wholesale_cost` STRING, " + + " `ss_list_price` STRING, " + + " `ss_sales_price` STRING, " + + " `ss_ext_discount_amt` STRING, " + + " `ss_ext_sales_price` STRING, " + + " `ss_ext_wholesale_cost` STRING, " + + " `ss_ext_list_price` STRING, " + + " `ss_ext_tax` STRING, " + + " `ss_coupon_amt` STRING, " + + " `ss_net_paid` STRING, " + + " `ss_net_paid_inc_tax` STRING, " + + " `ss_net_profit` STRING " + + ")" + + "USING iceberg " + + "PARTITIONED BY (%s) " + + "TBLPROPERTIES (" + + " '%s' '%b'," + + " '%s' '%s'," + + " '%s' '%d')", + TABLE_NAME, + PARTITION_COLUMN, + TableProperties.MANIFEST_MERGE_ENABLED, + false, + TableProperties.DELETE_MODE, + RowLevelOperationMode.MERGE_ON_READ.modeName(), + TableProperties.FORMAT_VERSION, + 2); + + this.table = Spark3Util.loadIcebergTable(spark, TABLE_NAME); + } + + private void dropTable() { + sql("DROP TABLE IF EXISTS %s PURGE", TABLE_NAME); + } + + private String newWarehouseDir() { + return hadoopConf.get("hadoop.tmp.dir") + UUID.randomUUID(); + } + + @FormatMethod + private void sql(@FormatString String query, Object... args) { + spark.sql(String.format(query, args)); + } +}