diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
index 15dbb07daac2..a8fb72661fe0 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
@@ -75,7 +75,20 @@ public abstract class BinPackStrategy implements RewriteStrategy {
public static final String MAX_FILE_SIZE_BYTES = "max-file-size-bytes";
public static final double MAX_FILE_SIZE_DEFAULT_RATIO = 1.80d;
+ /**
+ * The minimum number of deletes that needs to be associated with a data file for it to be considered for rewriting.
+ * If a data file has more than this number of deletes, it will be rewritten regardless of its file size determined
+ * by {@link #MIN_FILE_SIZE_BYTES} and {@link #MAX_FILE_SIZE_BYTES}.
+ * If a file group contains a file that satisfies this condition, the file group will be rewritten regardless of
+ * the number of files in the file group determined by {@link #MIN_INPUT_FILES}
+ *
+ * Defaults to Integer.MAX_VALUE, which means this feature is not enabled by default.
+ */
+ public static final String MIN_DELETES_PER_FILE = "min-deletes-per-file";
+ public static final int MIN_DELETES_PER_FILE_DEFAULT = Integer.MAX_VALUE;
+
private int minInputFiles;
+ private int minDeletesPerFile;
private long minFileSize;
private long maxFileSize;
private long targetFileSize;
@@ -90,6 +103,7 @@ public String name() {
public Set validOptions() {
return ImmutableSet.of(
MIN_INPUT_FILES,
+ MIN_DELETES_PER_FILE,
MIN_FILE_SIZE_BYTES,
MAX_FILE_SIZE_BYTES
);
@@ -120,6 +134,10 @@ public RewriteStrategy options(Map options) {
MIN_INPUT_FILES,
MIN_INPUT_FILES_DEFAULT);
+ minDeletesPerFile = PropertyUtil.propertyAsInt(options,
+ MIN_DELETES_PER_FILE,
+ MIN_DELETES_PER_FILE_DEFAULT);
+
validateOptions();
return this;
}
@@ -127,7 +145,8 @@ public RewriteStrategy options(Map options) {
@Override
public Iterable selectFilesToRewrite(Iterable dataFiles) {
return FluentIterable.from(dataFiles)
- .filter(scanTask -> scanTask.length() < minFileSize || scanTask.length() > maxFileSize);
+ .filter(scanTask -> scanTask.length() < minFileSize || scanTask.length() > maxFileSize ||
+ taskHasTooManyDeletes(scanTask));
}
@Override
@@ -135,7 +154,8 @@ public Iterable> planFileGroups(Iterable dataFi
ListPacker packer = new BinPacking.ListPacker<>(maxGroupSize, 1, false);
List> potentialGroups = packer.pack(dataFiles, FileScanTask::length);
return potentialGroups.stream().filter(group ->
- group.size() >= minInputFiles || sizeOfInputFiles(group) > targetFileSize
+ group.size() >= minInputFiles || sizeOfInputFiles(group) > targetFileSize ||
+ group.stream().anyMatch(this::taskHasTooManyDeletes)
).collect(Collectors.toList());
}
@@ -218,6 +238,10 @@ private long sizeOfInputFiles(List group) {
return group.stream().mapToLong(FileScanTask::length).sum();
}
+ private boolean taskHasTooManyDeletes(FileScanTask task) {
+ return task.deletes() != null && task.deletes().size() >= minDeletesPerFile;
+ }
+
private void validateOptions() {
Preconditions.checkArgument(minFileSize >= 0,
"Cannot set %s to a negative number, %d < 0",
@@ -238,5 +262,9 @@ private void validateOptions() {
Preconditions.checkArgument(minInputFiles > 0,
"Cannot set %s is less than 1. All values less than 1 have the same effect as 1. %d < 1",
MIN_INPUT_FILES, minInputFiles);
+
+ Preconditions.checkArgument(minDeletesPerFile > 0,
+ "Cannot set %s is less than 1. All values less than 1 have the same effect as 1. %d < 1",
+ MIN_DELETES_PER_FILE, minDeletesPerFile);
}
}
diff --git a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java
index 0a16088b3f66..aef8065b68b7 100644
--- a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java
+++ b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java
@@ -47,6 +47,17 @@ public static MockFileScanTask mockTask(long length, int sortOrderId) {
return new MockFileScanTask(mockFile);
}
+ public static MockFileScanTask mockTaskWithDeletes(long length, int nDeletes) {
+ DeleteFile[] mockDeletes = new DeleteFile[nDeletes];
+ for (int i = 0; i < nDeletes; i++) {
+ mockDeletes[i] = Mockito.mock(DeleteFile.class);
+ }
+
+ DataFile mockFile = Mockito.mock(DataFile.class);
+ Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length);
+ return new MockFileScanTask(mockFile, mockDeletes);
+ }
+
@Override
public long length() {
return length;
diff --git a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
index 69cbe8f4652b..47c578c85870 100644
--- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
+++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
@@ -33,6 +33,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -107,6 +108,22 @@ public void testFilteringCustomMinMaxFileSize() {
Assert.assertEquals("Should remove files that exceed or are smaller than new bounds", expectedFiles, filtered);
}
+ @Test
+ public void testFilteringWithDeletes() {
+ RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
+ BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(550 * MB),
+ BinPackStrategy.MIN_FILE_SIZE_BYTES, Long.toString(490 * MB),
+ BinPackStrategy.MIN_DELETES_PER_FILE, Integer.toString(2)
+ ));
+
+ List testFiles = filesOfSize(500, 500, 480, 480, 560, 520);
+ testFiles.add(MockFileScanTask.mockTaskWithDeletes(500 * MB, 2));
+ Iterable expectedFiles = filesOfSize(480, 480, 560, 500);
+ Iterable filtered = ImmutableList.copyOf(strategy.selectFilesToRewrite(testFiles));
+
+ Assert.assertEquals("Should include file with deletes", expectedFiles, filtered);
+ }
+
@Test
public void testGroupingMinInputFilesInvalid() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
@@ -150,6 +167,23 @@ public void testGroupingMinInputFilesValid() {
ImmutableList.of(testFiles), grouped);
}
+ @Test
+ public void testGroupingWithDeletes() {
+ RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
+ BinPackStrategy.MIN_INPUT_FILES, Integer.toString(5),
+ BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(550 * MB),
+ BinPackStrategy.MIN_FILE_SIZE_BYTES, Long.toString(490 * MB),
+ BinPackStrategy.MIN_DELETES_PER_FILE, Integer.toString(2)
+ ));
+
+ List testFiles = Lists.newArrayList();
+ testFiles.add(MockFileScanTask.mockTaskWithDeletes(500 * MB, 2));
+ Iterable> grouped = strategy.planFileGroups(testFiles);
+
+ Assert.assertEquals("Should plan 1 groups since there are enough input files",
+ ImmutableList.of(testFiles), grouped);
+ }
+
@Test
public void testMaxGroupSize() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
@@ -196,12 +230,18 @@ public void testInvalidOptions() {
BinPackStrategy.MIN_FILE_SIZE_BYTES, Long.toString(1000 * MB)));
});
- AssertHelpers.assertThrows("Should not allow min input size smaller tha 1",
+ AssertHelpers.assertThrows("Should not allow min input size smaller than 1",
IllegalArgumentException.class, () -> {
defaultBinPack().options(ImmutableMap.of(
BinPackStrategy.MIN_INPUT_FILES, Long.toString(-5)));
});
+ AssertHelpers.assertThrows("Should not allow min deletes per file smaller than 1",
+ IllegalArgumentException.class, () -> {
+ defaultBinPack().options(ImmutableMap.of(
+ BinPackStrategy.MIN_DELETES_PER_FILE, Long.toString(-5)));
+ });
+
AssertHelpers.assertThrows("Should not allow negative target size",
IllegalArgumentException.class, () -> {
defaultBinPack().options(ImmutableMap.of(
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
index bd982dd2bbc5..2cd1af67c17b 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
@@ -26,14 +26,18 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
@@ -46,6 +50,12 @@
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
@@ -177,6 +187,59 @@ public void testBinPackWithFilter() {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
+ @Test
+ public void testBinPackWithDeletes() throws Exception {
+ Table table = createTablePartitioned(4, 2);
+ table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+ shouldHaveFiles(table, 8);
+ table.refresh();
+
+ CloseableIterable tasks = table.newScan().planFiles();
+ List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(),
+ null, null, null);
+ int total = (int) dataFiles.stream().mapToLong(ContentFile::recordCount).sum();
+
+ RowDelta rowDelta = table.newRowDelta();
+ // remove 2 rows for odd files, 1 row for even files
+ for (int i = 0; i < dataFiles.size(); i++) {
+ DataFile dataFile = dataFiles.get(i);
+ EncryptedOutputFile outputFile = EncryptedFiles.encryptedOutput(
+ table.io().newOutputFile(table.locationProvider().newDataLocation(UUID.randomUUID().toString())),
+ EncryptionKeyMetadata.EMPTY);
+ PositionDeleteWriter posDeleteWriter = appenderFactory.newPosDeleteWriter(
+ outputFile, FileFormat.PARQUET, dataFile.partition());
+ posDeleteWriter.delete(dataFile.path(), 0);
+ posDeleteWriter.close();
+ rowDelta.addDeletes(posDeleteWriter.toDeleteFile());
+
+ if (i % 2 != 0) {
+ outputFile = EncryptedFiles.encryptedOutput(
+ table.io().newOutputFile(table.locationProvider().newDataLocation(UUID.randomUUID().toString())),
+ EncryptionKeyMetadata.EMPTY);
+ posDeleteWriter = appenderFactory.newPosDeleteWriter(outputFile, FileFormat.PARQUET, dataFile.partition());
+ posDeleteWriter.delete(dataFile.path(), 1);
+ posDeleteWriter.close();
+ rowDelta.addDeletes(posDeleteWriter.toDeleteFile());
+ }
+ }
+
+ rowDelta.commit();
+ table.refresh();
+ List