diff --git a/api/src/main/java/org/apache/iceberg/DeleteFiles.java b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
index 74d31a6dad81..dfdee112c938 100644
--- a/api/src/main/java/org/apache/iceberg/DeleteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
@@ -55,6 +55,17 @@ default DeleteFiles deleteFile(DataFile file) {
return this;
}
+ /**
+ * Delete a file tracked by a {@link DeleteFile} from the underlying table.
+ *
+ * @param file a DeleteFile to remove from the table
+ * @return this for method chaining
+ */
+ default DeleteFiles deleteFile(DeleteFile file) {
+ deleteFile(file.path());
+ return this;
+ }
+
/**
* Delete files that match an {@link Expression} on data rows from the table.
*
diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index e5b5766f918d..1219071b1b2d 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -64,4 +64,10 @@ default DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteReachableFiles");
}
+
+ /** Instantiates an action to remove dangling delete files from current snapshot. */
+ default RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement removeDanglingDeleteFiles");
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java b/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
new file mode 100644
index 000000000000..b02eeb3e3cd4
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+
+/**
+ * An action that removes dangling delete files from the current snapshot. A delete file is dangling
+ * if its deletes no longer applies to any data file.
+ *
+ *
The following dangling delete files are removed:
+ *
+ *
+ * - Position delete files with a sequence number less than that of any data file in the same
+ * partition
+ *
- Equality delete files with a sequence number less than or equal to that of any data file in
+ * the same partition
+ *
+ */
+public interface RemoveDanglingDeleteFiles
+ extends Action {
+
+ /** The action result that contains a summary of the execution. */
+ interface Result {
+ /** Removes representation of removed delete files. */
+ List removedDeleteFiles();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
index 8ff7bb831ec9..33f1e5b97fe0 100644
--- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
+++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
@@ -54,6 +54,12 @@ public StreamingDelete deleteFile(DataFile file) {
return this;
}
+ @Override
+ public StreamingDelete deleteFile(DeleteFile file) {
+ delete(file);
+ return this;
+ }
+
@Override
public StreamingDelete deleteFromRowFilter(Expression expr) {
deleteByRowFilter(expr);
diff --git a/core/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFilesActionResult.java b/core/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFilesActionResult.java
new file mode 100644
index 000000000000..4f3a69ce67da
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFilesActionResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+public class RemoveDanglingDeleteFilesActionResult implements RemoveDanglingDeleteFiles.Result {
+
+ private static final RemoveDanglingDeleteFilesActionResult EMPTY =
+ new RemoveDanglingDeleteFilesActionResult(ImmutableList.of());
+
+ private List removedDeleteFiles;
+
+ public RemoveDanglingDeleteFilesActionResult(List removeDeleteFiles) {
+ this.removedDeleteFiles = removeDeleteFiles;
+ }
+
+ public static RemoveDanglingDeleteFilesActionResult empty() {
+ return EMPTY;
+ }
+
+ @Override
+ public List removedDeleteFiles() {
+ return removedDeleteFiles;
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index cdd80040fa9e..be5690790be6 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
@@ -58,20 +59,26 @@
import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
+import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class BaseSparkAction {
+ public static final String USE_CACHING = "use-caching";
+ public static final boolean USE_CACHING_DEFAULT = true;
+
protected static final String MANIFEST = "Manifest";
protected static final String MANIFEST_LIST = "Manifest List";
protected static final String OTHERS = "Others";
@@ -361,4 +368,25 @@ static FileInfo toFileInfo(ContentFile> file) {
return new FileInfo(file.path().toString(), file.content().toString());
}
}
+
+ protected U withReusableDS(Dataset ds, Function, U> func) {
+ Dataset reusableDS;
+ boolean useCaching =
+ PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
+ if (useCaching) {
+ reusableDS = ds.cache();
+ } else {
+ int parallelism = SQLConf.get().numShufflePartitions();
+ reusableDS =
+ ds.repartition(parallelism).map((MapFunction) value -> value, ds.exprEnc());
+ }
+
+ try {
+ return func.apply(reusableDS);
+ } finally {
+ if (useCaching) {
+ reusableDS.unpersist(false);
+ }
+ }
+ }
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java
new file mode 100644
index 000000000000..acc0bbf8217a
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+import static org.apache.spark.sql.functions.min;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFilesActionResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An action that removes dangling delete files from the current snapshot. A delete file is dangling
+ * if its deletes no longer applies to any data file.
+ *
+ * The following dangling delete files are removed:
+ *
+ *
+ * - Position delete files with a sequence number less than that of any data file in the same
+ * partition
+ *
- Equality delete files with a sequence number less than or equal to that of any data file in
+ * the same partition
+ *
+ */
+public class RemoveDanglingDeletesSparkAction
+ extends BaseSnapshotUpdateSparkAction
+ implements RemoveDanglingDeleteFiles {
+
+ private final Table table;
+
+ protected RemoveDanglingDeletesSparkAction(SparkSession spark, Table table) {
+ super(spark);
+ this.table = table;
+ }
+
+ @Override
+ protected RemoveDanglingDeletesSparkAction self() {
+ return this;
+ }
+
+ @Override
+ public Result execute() {
+ if (table.specs().size() == 1 && table.spec().isUnpartitioned()) {
+ // ManifestFilterManager already performs this table-wide on each commit
+ return RemoveDanglingDeleteFilesActionResult.empty();
+ }
+
+ String desc = String.format("Remove dangling delete files for %s", table.name());
+ JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
+ return withJobGroupInfo(info, this::doExecute);
+ }
+
+ private Result doExecute() {
+ Dataset entries =
+ loadMetadataTable(table, ENTRIES)
+ .filter("status < 2") // live entries
+ .selectExpr(
+ "data_file.partition as partition",
+ "data_file.spec_id as spec_id",
+ "data_file.file_path as file_path",
+ "data_file.content as content",
+ "data_file.file_size_in_bytes as file_size_in_bytes",
+ "data_file.record_count as record_count",
+ "sequence_number");
+
+ DeleteFiles deleteFiles = table.newDelete();
+ List toRemove = withReusableDS(entries, this::danglingDeletes);
+ toRemove.forEach(deleteFiles::deleteFile);
+ deleteFiles.commit();
+
+ return new RemoveDanglingDeleteFilesActionResult(toRemove);
+ }
+
+ /**
+ * Calculate dangling delete files
+ *
+ *
+ * - Group all files by partition, calculate the minimum data file sequence number in each.
+ *
- For each partition, check if any position delete files have a sequence number less than
+ * the partition's min_data_sequence_number
+ *
- For each partition, check if any equality delete files have a sequence number less than
+ * or equal to the partition's min_data_sequence_number
+ *
+ *
+ * @param entries dataset of file entries, marked by content (0 for data, 1 for posDeletes, 2 for
+ * eqDeletes)
+ * @return list of dangling delete files
+ */
+ private List danglingDeletes(Dataset entries) {
+ List removedDeleteFiles = Lists.newArrayList();
+
+ // Minimum sequence number of data files in each partition
+ Dataset minDataSeqNumberPerPartition =
+ entries
+ .filter("content == 0") // data files
+ .groupBy("partition", "spec_id")
+ .agg(min("sequence_number"))
+ .toDF("partition", "spec_id", "min_data_sequence_number");
+
+ // Dangling position delete files
+ Column joinCond =
+ minDataSeqNumberPerPartition
+ .col("partition")
+ .equalTo(entries.col("partition"))
+ .and(minDataSeqNumberPerPartition.col("spec_id").equalTo(entries.col("spec_id")));
+ Dataset posDeleteDs =
+ entries
+ .filter("content == 1") // position delete files
+ .join(minDataSeqNumberPerPartition, joinCond)
+ .filter("sequence_number < min_data_sequence_number")
+ .select(
+ entries.col("partition"),
+ entries.col("spec_id"),
+ entries.col("file_path"),
+ entries.col("file_size_in_bytes"),
+ entries.col("record_count"));
+ MakeDeleteFile makePosDeleteFn =
+ new MakeDeleteFile(true, Partitioning.partitionType(table), table.specs());
+ Dataset posDeletesToRemove =
+ posDeleteDs.map(makePosDeleteFn, Encoders.javaSerialization(DeleteFile.class));
+
+ removedDeleteFiles.addAll(posDeletesToRemove.collectAsList());
+
+ // Dangling equality delete files
+ Dataset eqDeleteDs =
+ entries
+ .filter("content == 2") // equality delete files
+ .join(minDataSeqNumberPerPartition, joinCond)
+ .filter("sequence_number <= min_data_sequence_number")
+ .select(
+ entries.col("partition"),
+ entries.col("spec_id"),
+ entries.col("file_path"),
+ entries.col("file_size_in_bytes"),
+ entries.col("record_count"));
+ MakeDeleteFile makeEqDeleteFn =
+ new MakeDeleteFile(false, Partitioning.partitionType(table), table.specs());
+ Dataset eqDeletesToRemove =
+ eqDeleteDs.map(makeEqDeleteFn, Encoders.javaSerialization(DeleteFile.class));
+
+ removedDeleteFiles.addAll(eqDeletesToRemove.collectAsList());
+ return removedDeleteFiles;
+ }
+
+ private static class MakeDeleteFile implements MapFunction {
+
+ private final boolean posDeletes;
+ private final Types.StructType partitionType;
+ private final Map specsById;
+
+ /**
+ * Map function that transforms entries table rows into {@link DeleteFile}
+ *
+ * @param posDeletes true for position deletes, false for equality deletes
+ * @param partitionType partition type of table
+ * @param specsById table's partition specs
+ */
+ MakeDeleteFile(
+ boolean posDeletes, Types.StructType partitionType, Map specsById) {
+ this.posDeletes = posDeletes;
+ this.partitionType = partitionType;
+ this.specsById = specsById;
+ }
+
+ @Override
+ public DeleteFile call(Row row) throws Exception {
+ PartitionData partition = new PartitionData(partitionType);
+ GenericRowWithSchema partitionRow = row.getAs(0);
+
+ for (int i = 0; i < partitionRow.length(); i++) {
+ partition.set(i, partitionRow.get(i));
+ }
+ int specId = row.getAs(1);
+ String path = row.getAs(2);
+ long fileSize = row.getAs(3);
+ long recordCount = row.getAs(4);
+
+ FileMetadata.Builder builder = FileMetadata.deleteFileBuilder(specsById.get(specId));
+ if (posDeletes) {
+ builder.ofPositionDeletes();
+ } else {
+ builder.ofEqualityDeletes();
+ }
+
+ return builder
+ .withPath(path)
+ .withPartition(partition)
+ .withFileSizeInBytes(fileSize)
+ .withRecordCount(recordCount)
+ .build();
+ }
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index b598bcb19bd8..5f0307bb4ce5 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -24,7 +24,6 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
@@ -57,7 +56,6 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
-import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
@@ -66,7 +64,6 @@
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,9 +79,6 @@
public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction implements RewriteManifests {
- public static final String USE_CACHING = "use-caching";
- public static final boolean USE_CACHING_DEFAULT = true;
-
private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
private final Encoder manifestEncoder;
@@ -266,27 +260,6 @@ private List writeManifestsForPartitionedTable(
});
}
- private U withReusableDS(Dataset ds, Function, U> func) {
- Dataset reusableDS;
- boolean useCaching =
- PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
- if (useCaching) {
- reusableDS = ds.cache();
- } else {
- int parallelism = SQLConf.get().numShufflePartitions();
- reusableDS =
- ds.repartition(parallelism).map((MapFunction) value -> value, ds.exprEnc());
- }
-
- try {
- return func.apply(reusableDS);
- } finally {
- if (useCaching) {
- reusableDS.unpersist(false);
- }
- }
- }
-
private List findMatchingManifests() {
Snapshot currentSnapshot = table.currentSnapshot();
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index 8c886adf510e..f7ab72dfcb01 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -91,4 +91,9 @@ public ExpireSnapshotsSparkAction expireSnapshots(Table table) {
public DeleteReachableFilesSparkAction deleteReachableFiles(String metadataLocation) {
return new DeleteReachableFilesSparkAction(spark, metadataLocation);
}
+
+ @Override
+ public RemoveDanglingDeletesSparkAction removeDanglingDeleteFiles(Table table) {
+ return new RemoveDanglingDeletesSparkAction(spark, table);
+ }
}
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
new file mode 100644
index 000000000000..212b8a815024
--- /dev/null
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+import java.io.File;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Tuple2;
+
+public class TestRemoveDanglingDeleteAction extends SparkTestBase {
+
+ private static final HadoopTables TABLES = new HadoopTables(new Configuration());
+ private static final Schema SCHEMA =
+ new Schema(
+ optional(1, "c1", Types.StringType.get()),
+ optional(2, "c2", Types.StringType.get()),
+ optional(3, "c3", Types.StringType.get()));
+
+ private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+
+ static final DataFile FILE_A =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_A2 =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_B =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_B2 =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_C =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-c.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=c") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_C2 =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-c.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=c") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_D =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-d.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=d") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_D2 =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-d.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=d") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-a-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A2_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-a2-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-a-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A2_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-a2-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_B_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-b-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_B2_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-b2-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_B_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-b-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_B2_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-b2-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+
+ static final DataFile FILE_UNPARTITIONED =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-unpartitioned.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_UNPARTITIONED_POS_DELETE =
+ FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-unpartitioned-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_UNPARTITIONED_EQ_DELETE =
+ FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-unpartitioned-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ private String tableLocation;
+ private Table table;
+
+ @Before
+ public void before() throws Exception {
+ File tableDir = temp.newFolder();
+ this.tableLocation = tableDir.toURI().toString();
+ }
+
+ @After
+ public void after() {
+ TABLES.dropTable(tableLocation);
+ }
+
+ private void setupNormalPartitionedTable() {
+ this.table =
+ TABLES.create(
+ SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), tableLocation);
+ }
+
+ private void setupUnpartitionedTable() {
+ this.table =
+ TABLES.create(
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"),
+ tableLocation);
+ }
+
+ @Test
+ public void testPartitionedDeletesWithLesserSeqNo() {
+ setupNormalPartitionedTable();
+
+ // Add Data Files
+ table.newAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+ // Add Delete Files
+ table
+ .newRowDelta()
+ .addDeletes(FILE_A_POS_DELETES)
+ .addDeletes(FILE_A2_POS_DELETES)
+ .addDeletes(FILE_B_POS_DELETES)
+ .addDeletes(FILE_B2_POS_DELETES)
+ .addDeletes(FILE_A_EQ_DELETES)
+ .addDeletes(FILE_A2_EQ_DELETES)
+ .addDeletes(FILE_B_EQ_DELETES)
+ .addDeletes(FILE_B2_EQ_DELETES)
+ .commit();
+
+ // Add More Data Files
+ table
+ .newAppend()
+ .appendFile(FILE_A2)
+ .appendFile(FILE_B2)
+ .appendFile(FILE_C2)
+ .appendFile(FILE_D2)
+ .commit();
+
+ List> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#entries")
+ .select("sequence_number", "data_file.file_path")
+ .sort("sequence_number", "data_file.file_path")
+ .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+ .collectAsList();
+ List> expected =
+ ImmutableList.of(
+ Tuple2.apply(1L, FILE_B.path().toString()),
+ Tuple2.apply(1L, FILE_C.path().toString()),
+ Tuple2.apply(1L, FILE_D.path().toString()),
+ Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+ Tuple2.apply(3L, FILE_A2.path().toString()),
+ Tuple2.apply(3L, FILE_B2.path().toString()),
+ Tuple2.apply(3L, FILE_C2.path().toString()),
+ Tuple2.apply(3L, FILE_D2.path().toString()));
+ Assert.assertEquals(expected, actual);
+
+ RemoveDanglingDeleteFiles.Result result =
+ SparkActions.get().removeDanglingDeleteFiles(table).execute();
+
+ // All Delete files of the FILE A partition should be removed
+ // because there are no data files in partition with a lesser sequence number
+ Set removedDeleteFiles =
+ result.removedDeleteFiles().stream().map(DeleteFile::path).collect(Collectors.toSet());
+ Assert.assertEquals("Expected two delete files removed", 4, removedDeleteFiles.size());
+ Assert.assertTrue(removedDeleteFiles.contains(FILE_A_POS_DELETES.path()));
+ Assert.assertTrue(removedDeleteFiles.contains(FILE_A2_POS_DELETES.path()));
+ Assert.assertTrue(removedDeleteFiles.contains(FILE_A_EQ_DELETES.path()));
+ Assert.assertTrue(removedDeleteFiles.contains(FILE_A2_EQ_DELETES.path()));
+
+ List> actualAfter =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#entries")
+ .filter("status < 2") // live files
+ .select("sequence_number", "data_file.file_path")
+ .sort("sequence_number", "data_file.file_path")
+ .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+ .collectAsList();
+ List> expectedAfter =
+ ImmutableList.of(
+ Tuple2.apply(1L, FILE_B.path().toString()),
+ Tuple2.apply(1L, FILE_C.path().toString()),
+ Tuple2.apply(1L, FILE_D.path().toString()),
+ Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+ Tuple2.apply(3L, FILE_A2.path().toString()),
+ Tuple2.apply(3L, FILE_B2.path().toString()),
+ Tuple2.apply(3L, FILE_C2.path().toString()),
+ Tuple2.apply(3L, FILE_D2.path().toString()));
+ Assert.assertEquals(expectedAfter, actualAfter);
+ }
+
+ @Test
+ public void testPartitionedDeletesWithEqSeqNo() {
+ setupNormalPartitionedTable();
+
+ // Add Data Files
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+ // Add Data Files with EQ and POS deletes
+ table
+ .newRowDelta()
+ .addRows(FILE_A2)
+ .addRows(FILE_B2)
+ .addRows(FILE_C2)
+ .addRows(FILE_D2)
+ .addDeletes(FILE_A_POS_DELETES)
+ .addDeletes(FILE_A2_POS_DELETES)
+ .addDeletes(FILE_A_EQ_DELETES)
+ .addDeletes(FILE_A2_EQ_DELETES)
+ .addDeletes(FILE_B_POS_DELETES)
+ .addDeletes(FILE_B2_POS_DELETES)
+ .addDeletes(FILE_B_EQ_DELETES)
+ .addDeletes(FILE_B2_EQ_DELETES)
+ .commit();
+
+ List> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#entries")
+ .select("sequence_number", "data_file.file_path")
+ .sort("sequence_number", "data_file.file_path")
+ .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+ .collectAsList();
+ List> expected =
+ ImmutableList.of(
+ Tuple2.apply(1L, FILE_A.path().toString()),
+ Tuple2.apply(1L, FILE_C.path().toString()),
+ Tuple2.apply(1L, FILE_D.path().toString()),
+ Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2.path().toString()),
+ Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2.path().toString()),
+ Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_C2.path().toString()),
+ Tuple2.apply(2L, FILE_D2.path().toString()));
+ Assert.assertEquals(expected, actual);
+
+ RemoveDanglingDeleteFiles.Result result =
+ SparkActions.get().removeDanglingDeleteFiles(table).execute();
+
+ // Eq Delete files of the FILE B partition should be removed
+ // because there are no data files in partition with a lesser sequence number
+ Set removedDeleteFiles =
+ result.removedDeleteFiles().stream().map(DeleteFile::path).collect(Collectors.toSet());
+ Assert.assertEquals("Expected two delete files removed", 2, removedDeleteFiles.size());
+ Assert.assertTrue(removedDeleteFiles.contains(FILE_B_EQ_DELETES.path()));
+ Assert.assertTrue(removedDeleteFiles.contains(FILE_B2_EQ_DELETES.path()));
+
+ List> actualAfter =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#entries")
+ .filter("status < 2") // live files
+ .select("sequence_number", "data_file.file_path")
+ .sort("sequence_number", "data_file.file_path")
+ .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+ .collectAsList();
+ List> expectedAfter =
+ ImmutableList.of(
+ Tuple2.apply(1L, FILE_A.path().toString()),
+ Tuple2.apply(1L, FILE_C.path().toString()),
+ Tuple2.apply(1L, FILE_D.path().toString()),
+ Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2.path().toString()),
+ Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2.path().toString()),
+ Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_C2.path().toString()),
+ Tuple2.apply(2L, FILE_D2.path().toString()));
+ Assert.assertEquals(expectedAfter, actualAfter);
+ }
+
+ @Test
+ public void testUnpartitionedTable() {
+ setupUnpartitionedTable();
+
+ table
+ .newRowDelta()
+ .addDeletes(FILE_UNPARTITIONED_POS_DELETE)
+ .addDeletes(FILE_UNPARTITIONED_EQ_DELETE)
+ .commit();
+ table.newAppend().appendFile(FILE_UNPARTITIONED).commit();
+
+ RemoveDanglingDeleteFiles.Result result =
+ SparkActions.get().removeDanglingDeleteFiles(table).execute();
+ Assert.assertEquals("No-op for unpartitioned tables", 0, result.removedDeleteFiles().size());
+ }
+}