diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java index f756c4cde015..99586f2503c2 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java @@ -52,6 +52,7 @@ public abstract class SparkContentFile implements ContentFile { private final int keyMetadataPosition; private final int splitOffsetsPosition; private final int sortOrderIdPosition; + private final int fileSpecIdPosition; private final int equalityIdsPosition; private final Type lowerBoundsType; private final Type upperBoundsType; @@ -100,6 +101,7 @@ public abstract class SparkContentFile implements ContentFile { this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name()); this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name()); this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name()); + this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name()); this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name()); } @@ -120,7 +122,10 @@ public Long pos() { @Override public int specId() { - return -1; + if (wrapped.isNullAt(fileSpecIdPosition)) { + return -1; + } + return wrapped.getAs(fileSpecIdPosition); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java new file mode 100644 index 000000000000..b9dc46f5e1bc --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.min; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ImmutableRemoveDanglingDeleteFiles; +import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.spark.SparkDeleteFile; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An action that removes dangling delete files from the current snapshot. A delete file is dangling + * if its deletes no longer applies to any live data files. + * + *

The following dangling delete files are removed: + * + *

    + *
  • Position delete files with a data sequence number less than that of any data file in the + * same partition + *
  • Equality delete files with a data sequence number less than or equal to that of any data + * file in the same partition + *
+ */ +class RemoveDanglingDeletesSparkAction + extends BaseSnapshotUpdateSparkAction + implements RemoveDanglingDeleteFiles { + private static final Logger LOG = LoggerFactory.getLogger(RemoveDanglingDeletesSparkAction.class); + private final Table table; + + protected RemoveDanglingDeletesSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + } + + @Override + protected RemoveDanglingDeletesSparkAction self() { + return this; + } + + public Result execute() { + if (table.specs().size() == 1 && table.spec().isUnpartitioned()) { + // ManifestFilterManager already performs this table-wide delete on each commit + return ImmutableRemoveDanglingDeleteFiles.Result.builder() + .removedDeleteFiles(Collections.emptyList()) + .build(); + } + String desc = String.format("Removing dangling delete files in %s", table.name()); + JobGroupInfo info = newJobGroupInfo("REMOVE-DELETES", desc); + return withJobGroupInfo(info, this::doExecute); + } + + Result doExecute() { + RewriteFiles rewriteFiles = table.newRewrite(); + List danglingDeletes = findDanglingDeletes(); + for (DeleteFile deleteFile : danglingDeletes) { + LOG.debug("Removing dangling delete file {}", deleteFile.path()); + rewriteFiles.deleteFile(deleteFile); + } + if (!danglingDeletes.isEmpty()) { + commit(rewriteFiles); + } + return ImmutableRemoveDanglingDeleteFiles.Result.builder() + .removedDeleteFiles(danglingDeletes) + .build(); + } + + /** + * Dangling delete files can be identified with following steps + * + *
    + *
  1. Group data files by partition keys and find the minimum data sequence number in each + * group. + *
  2. Left outer join delete files with partition-grouped data files on partition keys. + *
  3. Find dangling deletes by comparing each delete file's sequence number to its partition's + * minimum data sequence number. + *
  4. Collect results row to driver and use {@link SparkDeleteFile SparkDeleteFile} to wrap + * rows to valid delete files + *
+ */ + private List findDanglingDeletes() { + Dataset minSequenceNumberByPartition = + loadMetadataTable(table, MetadataTableType.ENTRIES) + // find live data files + .filter("data_file.content == 0 AND status < 2") + .selectExpr( + "data_file.partition as partition", + "data_file.spec_id as spec_id", + "sequence_number") + .groupBy("partition", "spec_id") + .agg(min("sequence_number")) + .toDF("grouped_partition", "grouped_spec_id", "min_data_sequence_number"); + Dataset deleteEntries = + loadMetadataTable(table, MetadataTableType.ENTRIES) + // find live delete files + .filter("data_file.content != 0 AND status < 2"); + Column joinOnPartition = + deleteEntries + .col("data_file.spec_id") + .equalTo(minSequenceNumberByPartition.col("grouped_spec_id")) + .and( + deleteEntries + .col("data_file.partition") + .equalTo(minSequenceNumberByPartition.col("grouped_partition"))); + Column filterOnDanglingDeletes = + col("min_data_sequence_number") + // delete fies without any data files in partition + .isNull() + // position delete files without any applicable data files in partition + .or( + col("data_file.content") + .equalTo("1") + .and(col("sequence_number").$less(col("min_data_sequence_number")))) + // equality delete files without any applicable data files in the partition + .or( + col("data_file.content") + .equalTo("2") + .and(col("sequence_number").$less$eq(col("min_data_sequence_number")))); + Dataset danglingDeletes = + deleteEntries + .join(minSequenceNumberByPartition, joinOnPartition, "left") + .filter(filterOnDanglingDeletes) + .select("data_file.*"); + return danglingDeletes.collectAsList().stream() + // map on driver because SparkDeleteFile is not serializable + .map(row -> deleteFileWrapper(danglingDeletes.schema(), row)) + .collect(Collectors.toList()); + } + + private DeleteFile deleteFileWrapper(StructType sparkFileType, Row row) { + int specId = row.getInt(row.fieldIndex("spec_id")); + Types.StructType combinedFileType = DataFile.getType(Partitioning.partitionType(table)); + // Set correct spec id + Types.StructType projection = DataFile.getType(table.specs().get(specId).partitionType()); + return new SparkDeleteFile(combinedFileType, projection, sparkFileType).wrap(row); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index a4c6642a3edf..0b2bbb3dfc39 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -40,6 +40,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.actions.FileRewriter; import org.apache.iceberg.actions.ImmutableRewriteDataFiles; +import org.apache.iceberg.actions.ImmutableRewriteDataFiles.Result.Builder; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFilesCommitManager; import org.apache.iceberg.actions.RewriteFileGroup; @@ -53,6 +54,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Queues; @@ -82,7 +84,8 @@ public class RewriteDataFilesSparkAction TARGET_FILE_SIZE_BYTES, USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER, - OUTPUT_SPEC_ID); + OUTPUT_SPEC_ID, + REMOVE_DANGLING_DELETES); private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); @@ -93,6 +96,7 @@ public class RewriteDataFilesSparkAction private int maxConcurrentFileGroupRewrites; private int maxCommits; private boolean partialProgressEnabled; + private boolean removeDanglingDeletes; private boolean useStartingSequenceNumber; private RewriteJobOrder rewriteJobOrder; private FileRewriter rewriter = null; @@ -173,11 +177,17 @@ public RewriteDataFiles.Result execute() { Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); - if (partialProgressEnabled) { - return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId)); - } else { - return doExecute(ctx, groupStream, commitManager(startingSnapshotId)); + Builder resultBuilder = + partialProgressEnabled + ? doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId)) + : doExecute(ctx, groupStream, commitManager(startingSnapshotId)); + if (removeDanglingDeletes) { + RemoveDanglingDeletesSparkAction action = + new RemoveDanglingDeletesSparkAction(spark(), table); + int removedCount = Iterables.size(action.execute().removedDeleteFiles()); + resultBuilder.removedDeleteFilesCount(removedCount); } + return resultBuilder.build(); } StructLikeMap>> planFileGroups(long startingSnapshotId) { @@ -261,7 +271,7 @@ RewriteDataFilesCommitManager commitManager(long startingSnapshotId) { return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber); } - private Result doExecute( + private Builder doExecute( RewriteExecutionContext ctx, Stream groupStream, RewriteDataFilesCommitManager commitManager) { @@ -323,10 +333,10 @@ private Result doExecute( List rewriteResults = rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); + return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults); } - private Result doExecuteWithPartialProgress( + private Builder doExecuteWithPartialProgress( RewriteExecutionContext ctx, Stream groupStream, RewriteDataFilesCommitManager commitManager) { @@ -372,8 +382,7 @@ private Result doExecuteWithPartialProgress( commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); return ImmutableRewriteDataFiles.Result.builder() .rewriteResults(rewriteResults) - .rewriteFailures(rewriteFailures) - .build(); + .rewriteFailures(rewriteFailures); } Stream toGroupStream( @@ -435,6 +444,10 @@ void validateAndInitOptions() { PropertyUtil.propertyAsBoolean( options(), USE_STARTING_SEQUENCE_NUMBER, USE_STARTING_SEQUENCE_NUMBER_DEFAULT); + removeDanglingDeletes = + PropertyUtil.propertyAsBoolean( + options(), REMOVE_DANGLING_DELETES, REMOVE_DANGLING_DELETES_DEFAULT); + rewriteJobOrder = RewriteJobOrder.fromName( PropertyUtil.propertyAsString(options(), REWRITE_JOB_ORDER, REWRITE_JOB_ORDER_DEFAULT)); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index f845386d30c4..ba9fa2e7b4db 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -21,6 +21,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier; import org.apache.spark.sql.SparkSession; @@ -102,4 +103,9 @@ public RewritePositionDeleteFilesSparkAction rewritePositionDeletes(Table table) public ComputeTableStats computeTableStats(Table table) { return new ComputeTableStatsSparkAction(spark, table); } + + @Override + public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) { + return new RemoveDanglingDeletesSparkAction(spark, table); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java new file mode 100644 index 000000000000..8ac9be00ae90 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Encoders; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.Tuple2; + +public class TestRemoveDanglingDeleteAction extends SparkTestBase { + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + private static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.StringType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + static final DataFile FILE_A = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=a") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_A2 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=a") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_B = + DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=b") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_B2 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=b") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_C = + DataFiles.builder(SPEC) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=c") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_C2 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=c") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_D = + DataFiles.builder(SPEC) + .withPath("/path/to/data-d.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=d") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_D2 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-d.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=d") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_A_POS_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-a-pos-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=a") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_A2_POS_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-a2-pos-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=a") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_A_EQ_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-a-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=a") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_A2_EQ_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-a2-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=a") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_B_POS_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-b-pos-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=b") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_B2_POS_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-b2-pos-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=b") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_B_EQ_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-b-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=b") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_B2_EQ_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-b2-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=b") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_UNPARTITIONED = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-unpartitioned.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + static final DeleteFile FILE_UNPARTITIONED_POS_DELETE = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofEqualityDeletes() + .withPath("/path/to/data-unpartitioned-pos-deletes.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + static final DeleteFile FILE_UNPARTITIONED_EQ_DELETE = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofEqualityDeletes() + .withPath("/path/to/data-unpartitioned-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private String tableLocation = null; + private Table table; + + @Before + public void before() throws Exception { + File tableDir = temp.newFolder(); + this.tableLocation = tableDir.toURI().toString(); + } + + @After + public void after() { + TABLES.dropTable(tableLocation); + } + + private void setupPartitionedTable() { + this.table = + TABLES.create( + SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), tableLocation); + } + + private void setupUnpartitionedTable() { + this.table = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), + tableLocation); + } + + @Test + public void testPartitionedDeletesWithLesserSeqNo() { + setupPartitionedTable(); + // Add Data Files + table.newAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit(); + // Add Delete Files + table + .newRowDelta() + .addDeletes(FILE_A_POS_DELETES) + .addDeletes(FILE_A2_POS_DELETES) + .addDeletes(FILE_B_POS_DELETES) + .addDeletes(FILE_B2_POS_DELETES) + .addDeletes(FILE_A_EQ_DELETES) + .addDeletes(FILE_A2_EQ_DELETES) + .addDeletes(FILE_B_EQ_DELETES) + .addDeletes(FILE_B2_EQ_DELETES) + .commit(); + // Add More Data Files + table + .newAppend() + .appendFile(FILE_A2) + .appendFile(FILE_B2) + .appendFile(FILE_C2) + .appendFile(FILE_D2) + .commit(); + List> actual = + spark + .read() + .format("iceberg") + .load(tableLocation + "#entries") + .select("sequence_number", "data_file.file_path") + .sort("sequence_number", "data_file.file_path") + .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING())) + .collectAsList(); + List> expected = + ImmutableList.of( + Tuple2.apply(1L, FILE_B.path().toString()), + Tuple2.apply(1L, FILE_C.path().toString()), + Tuple2.apply(1L, FILE_D.path().toString()), + Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()), + Tuple2.apply(3L, FILE_A2.path().toString()), + Tuple2.apply(3L, FILE_B2.path().toString()), + Tuple2.apply(3L, FILE_C2.path().toString()), + Tuple2.apply(3L, FILE_D2.path().toString())); + assertThat(actual).isEqualTo(expected); + RemoveDanglingDeleteFiles.Result result = + SparkActions.get().removeDanglingDeleteFiles(table).execute(); + // All Delete files of the FILE A partition should be removed + // because there are no data files in partition with a lesser sequence number + Set removedDeleteFiles = + StreamSupport.stream(result.removedDeleteFiles().spliterator(), false) + .map(DeleteFile::path) + .collect(Collectors.toSet()); + assertThat(removedDeleteFiles) + .as("Expected 4 delete files removed") + .hasSize(4) + .containsExactlyInAnyOrder( + FILE_A_POS_DELETES.path(), + FILE_A2_POS_DELETES.path(), + FILE_A_EQ_DELETES.path(), + FILE_A2_EQ_DELETES.path()); + List> actualAfter = + spark + .read() + .format("iceberg") + .load(tableLocation + "#entries") + .filter("status < 2") // live files + .select("sequence_number", "data_file.file_path") + .sort("sequence_number", "data_file.file_path") + .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING())) + .collectAsList(); + List> expectedAfter = + ImmutableList.of( + Tuple2.apply(1L, FILE_B.path().toString()), + Tuple2.apply(1L, FILE_C.path().toString()), + Tuple2.apply(1L, FILE_D.path().toString()), + Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()), + Tuple2.apply(3L, FILE_A2.path().toString()), + Tuple2.apply(3L, FILE_B2.path().toString()), + Tuple2.apply(3L, FILE_C2.path().toString()), + Tuple2.apply(3L, FILE_D2.path().toString())); + assertThat(actualAfter).isEqualTo(expectedAfter); + } + + @Test + public void testPartitionedDeletesWithEqSeqNo() { + setupPartitionedTable(); + // Add Data Files + table.newAppend().appendFile(FILE_A).appendFile(FILE_C).appendFile(FILE_D).commit(); + // Add Data Files with EQ and POS deletes + table + .newRowDelta() + .addRows(FILE_A2) + .addRows(FILE_B2) + .addRows(FILE_C2) + .addRows(FILE_D2) + .addDeletes(FILE_A_POS_DELETES) + .addDeletes(FILE_A2_POS_DELETES) + .addDeletes(FILE_A_EQ_DELETES) + .addDeletes(FILE_A2_EQ_DELETES) + .addDeletes(FILE_B_POS_DELETES) + .addDeletes(FILE_B2_POS_DELETES) + .addDeletes(FILE_B_EQ_DELETES) + .addDeletes(FILE_B2_EQ_DELETES) + .commit(); + List> actual = + spark + .read() + .format("iceberg") + .load(tableLocation + "#entries") + .select("sequence_number", "data_file.file_path") + .sort("sequence_number", "data_file.file_path") + .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING())) + .collectAsList(); + List> expected = + ImmutableList.of( + Tuple2.apply(1L, FILE_A.path().toString()), + Tuple2.apply(1L, FILE_C.path().toString()), + Tuple2.apply(1L, FILE_D.path().toString()), + Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A2.path().toString()), + Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B2.path().toString()), + Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_C2.path().toString()), + Tuple2.apply(2L, FILE_D2.path().toString())); + assertThat(actual).isEqualTo(expected); + RemoveDanglingDeleteFiles.Result result = + SparkActions.get().removeDanglingDeleteFiles(table).execute(); + // Eq Delete files of the FILE B partition should be removed + // because there are no data files in partition with a lesser sequence number + Set removedDeleteFiles = + StreamSupport.stream(result.removedDeleteFiles().spliterator(), false) + .map(DeleteFile::path) + .collect(Collectors.toSet()); + assertThat(removedDeleteFiles) + .as("Expected two delete files removed") + .hasSize(2) + .containsExactlyInAnyOrder(FILE_B_EQ_DELETES.path(), FILE_B2_EQ_DELETES.path()); + List> actualAfter = + spark + .read() + .format("iceberg") + .load(tableLocation + "#entries") + .filter("status < 2") // live files + .select("sequence_number", "data_file.file_path") + .sort("sequence_number", "data_file.file_path") + .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING())) + .collectAsList(); + List> expectedAfter = + ImmutableList.of( + Tuple2.apply(1L, FILE_A.path().toString()), + Tuple2.apply(1L, FILE_C.path().toString()), + Tuple2.apply(1L, FILE_D.path().toString()), + Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A2.path().toString()), + Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()), + Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_B2.path().toString()), + Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()), + Tuple2.apply(2L, FILE_C2.path().toString()), + Tuple2.apply(2L, FILE_D2.path().toString())); + assertThat(actualAfter).isEqualTo(expectedAfter); + } + + @Test + public void testUnpartitionedTable() { + setupUnpartitionedTable(); + table + .newRowDelta() + .addDeletes(FILE_UNPARTITIONED_POS_DELETE) + .addDeletes(FILE_UNPARTITIONED_EQ_DELETE) + .commit(); + table.newAppend().appendFile(FILE_UNPARTITIONED).commit(); + RemoveDanglingDeleteFiles.Result result = + SparkActions.get().removeDanglingDeleteFiles(table).execute(); + assertThat(result.removedDeleteFiles()).as("No-op for unpartitioned tables").isEmpty(); + } +} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index ba173d02498e..656b7358d1a5 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -24,6 +24,7 @@ import static org.apache.spark.sql.functions.current_date; import static org.apache.spark.sql.functions.date_add; import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.min; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -55,6 +56,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.RowDelta; @@ -71,7 +73,9 @@ import org.apache.iceberg.actions.SizeBasedDataRewriter; import org.apache.iceberg.actions.SizeBasedFileRewriter; import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedFiles; @@ -84,6 +88,7 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -104,9 +109,11 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeMap; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.internal.SQLConf; import org.junit.Assert; @@ -129,6 +136,8 @@ public class TestRewriteDataFilesAction extends SparkTestBase { optional(2, "c2", Types.StringType.get()), optional(3, "c3", Types.StringType.get())); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); private final FileRewriteCoordinator coordinator = FileRewriteCoordinator.get(); @@ -330,6 +339,108 @@ public void testBinPackWithDeletes() { Assert.assertEquals("7 rows are removed", total - 7, actualRecords.size()); } + @Test + public void testRemoveDangledEqualityDeletesPartitionEvolution() { + Table table = + TABLES.create( + SCHEMA, + SPEC, + Collections.singletonMap(TableProperties.FORMAT_VERSION, "2"), + tableLocation); + // data seq = 1, write 4 files in 2 partitions + List records1 = + Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")); + writeRecords(records1); + List records2 = + Lists.newArrayList( + new ThreeColumnRecord(0, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(0, "DDDDDDDDDD", "DDDD")); + writeRecords(records2); + table.refresh(); + shouldHaveFiles(table, 4); + // data seq = 2 & 3, write 2 equality deletes in both partitions + writeEqDeleteRecord(table, "c1", 1, "c3", "AAAA"); + writeEqDeleteRecord(table, "c1", 2, "c3", "CCCC"); + table.refresh(); + Set existingDeletes = TestHelpers.deleteFiles(table); + assertThat(existingDeletes) + .as("Only one equality delete c1=1 is used in query planning") + .hasSize(1); + // partition evolution + table.refresh(); + table.updateSpec().addField(Expressions.ref("c3")).commit(); + // data seq = 4, write 2 new data files in both partitions for evolved spec + List records3 = + Lists.newArrayList( + new ThreeColumnRecord(1, "A", "CCCC"), new ThreeColumnRecord(2, "D", "DDDD")); + writeRecords(records3); + List originalData = currentData(); + RewriteDataFiles.Result result = + basicRewrite(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .filter(Expressions.equal("c1", 1)) + .option(RewriteDataFiles.REMOVE_DANGLING_DELETES, "true") + .execute(); + existingDeletes = TestHelpers.deleteFiles(table); + assertThat(existingDeletes).as("Shall pruned dangling deletes after rewrite").hasSize(0); + assertThat(result) + .extracting( + Result::addedDataFilesCount, + Result::rewrittenDataFilesCount, + Result::removedDeleteFilesCount) + .as("Should compact 3 data files into 2 and remove both dangled equality delete file") + .containsExactly(2, 3, 2); + shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 == 1", 5); + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + shouldHaveSnapshots(table, 7); + shouldHaveFiles(table, 5); + } + + @Test + public void testRemoveDangledPositionDeletesPartitionEvolution() { + Table table = + TABLES.create( + SCHEMA, + SPEC, + Collections.singletonMap(TableProperties.FORMAT_VERSION, "2"), + tableLocation); + // data seq = 1, write 4 files in 2 partitions + writeRecords(2, 2, 2); + List dataFilesBefore = TestHelpers.dataFiles(table, null); + shouldHaveFiles(table, 4); + // data seq = 2, write 1 position deletes in c1=1 + table + .newRowDelta() + .addDeletes(writePosDeletesToFile(table, dataFilesBefore.get(3), 1).get(0)) + .commit(); + // partition evolution + table.updateSpec().addField(Expressions.ref("c3")).commit(); + // data seq = 3, write 1 new data files in c1=1 for evolved spec + writeRecords(1, 1, 1); + shouldHaveFiles(table, 5); + List expectedRecords = currentData(); + Result result = + actions() + .rewriteDataFiles(table) + .filter(Expressions.equal("c1", 1)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .option(RewriteDataFiles.REMOVE_DANGLING_DELETES, "true") + .execute(); + assertThat(result) + .extracting( + Result::addedDataFilesCount, + Result::rewrittenDataFilesCount, + Result::removedDeleteFilesCount) + .as("Should rewrite 2 data files into 1 and remove 1 dangled position delete file") + .containsExactly(1, 2, 1); + shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 == 1", 3); + shouldHaveSnapshots(table, 5); + assertThat(table.currentSnapshot().summary().get("total-position-deletes")).isEqualTo("0"); + assertEquals("Rows must match", expectedRecords, currentData()); + } + @Test public void testBinPackWithDeleteAllData() { Map options = Maps.newHashMap(); @@ -1616,6 +1727,21 @@ protected void shouldHaveFiles(Table table, int numExpected) { Assert.assertEquals("Did not have the expected number of files", numExpected, numFiles); } + protected long shouldHaveMinSequenceNumberInPartition( + Table table, String partitionFilter, long expected) { + long actual = + SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES) + .filter("status != 2") + .filter(partitionFilter) + .select("sequence_number") + .agg(min("sequence_number")) + .as(Encoders.LONG()) + .collectAsList() + .get(0); + assertThat(actual).as("Did not have the expected min sequence number").isEqualTo(expected); + return actual; + } + protected void shouldHaveSnapshots(Table table, int expectedSnapshots) { table.refresh(); int actualSnapshots = Iterables.size(table.snapshots()); @@ -1812,6 +1938,11 @@ protected int averageFileSize(Table table) { .getAsDouble(); } + private void writeRecords(List records) { + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + writeDF(df); + } + private void writeRecords(int files, int numRecords) { writeRecords(files, numRecords, 0); } @@ -1865,7 +1996,10 @@ private List writePosDeletes( table .io() .newOutputFile( - table.locationProvider().newDataLocation(UUID.randomUUID().toString())); + table + .locationProvider() + .newDataLocation( + FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()))); EncryptedOutputFile encryptedOutputFile = EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY); @@ -1891,6 +2025,63 @@ private List writePosDeletes( return results; } + private void writeEqDeleteRecord( + Table table, String partCol, Object partVal, String delCol, Object delVal) { + List equalityFieldIds = Lists.newArrayList(table.schema().findField(delCol).fieldId()); + Schema eqDeleteRowSchema = table.schema().select(delCol); + Record partitionRecord = + GenericRecord.create(table.schema().select(partCol)) + .copy(ImmutableMap.of(partCol, partVal)); + Record record = GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of(delCol, delVal)); + writeEqDeleteRecord(table, equalityFieldIds, partitionRecord, eqDeleteRowSchema, record); + } + + private void writeEqDeleteRecord( + Table table, + List equalityFieldIds, + Record partitionRecord, + Schema eqDeleteRowSchema, + Record deleteRecord) { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PARQUET).build(); + GenericAppenderFactory appenderFactory = + new GenericAppenderFactory( + table.schema(), + table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, + null); + EncryptedOutputFile file = + createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory); + EqualityDeleteWriter eqDeleteWriter = + appenderFactory.newEqDeleteWriter( + file, FileFormat.PARQUET, createPartitionKey(table, partitionRecord)); + try (EqualityDeleteWriter clsEqDeleteWriter = eqDeleteWriter) { + clsEqDeleteWriter.write(deleteRecord); + } catch (Exception e) { + throw new RuntimeException(e); + } + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + } + + private PartitionKey createPartitionKey(Table table, Record record) { + if (table.spec().isUnpartitioned()) { + return null; + } + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + return partitionKey; + } + + private EncryptedOutputFile createEncryptedOutputFile( + PartitionKey partition, OutputFileFactory fileFactory) { + if (partition == null) { + return fileFactory.newOutputFile(); + } else { + return fileFactory.newOutputFile(partition); + } + } + private SparkActions actions() { return SparkActions.get(); }