Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,20 @@ public abstract class BinPackStrategy implements RewriteStrategy {
public static final String MAX_FILE_SIZE_BYTES = "max-file-size-bytes";
public static final double MAX_FILE_SIZE_DEFAULT_RATIO = 1.80d;

/**
* The minimum number of deletes that needs to be associated with a data file for it to be considered for rewriting.
* If a data file has more than this number of deletes, it will be rewritten regardless of its file size determined
* by {@link #MIN_FILE_SIZE_BYTES} and {@link #MAX_FILE_SIZE_BYTES}.
* If a file group contains a file that satisfies this condition, the file group will be rewritten regardless of
* the number of files in the file group determined by {@link #MIN_INPUT_FILES}
* <p>
* Defaults to Integer.MAX_VALUE, which means this feature is not enabled by default.
*/
public static final String MIN_DELETES_PER_FILE = "min-deletes-per-file";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What value is recommended to the user? Or how does the user compute the proper value? I'm thinking maybe adding an option to count filegroup valid if any file of it contains deletes. Because you don't know how many records match the equality delete, for example, delete a set of records in an area/province.

Nit: This is disabled by default but it is always comparing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to just do this based on the number of files since the read penalty is directly related to the number of files and less so to the amount of actual rows deleted.

No strong feeling on the default since we already have the amount of delete files in the task information so I don't think the check is very expensive

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we need to have specific settings for equality delete and position delete separately? Since the read penalties are different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is equivalent to setting this value to 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exact value to set depends on the user's tolerance of read performance, because more deletes means worse read performance and potentially getting out of memory, so users can tune this value based on their system requirements.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is equivalent to setting this value to 0?

Hm, that's it.

public static final int MIN_DELETES_PER_FILE_DEFAULT = Integer.MAX_VALUE;

private int minInputFiles;
private int minDeletesPerFile;
private long minFileSize;
private long maxFileSize;
private long targetFileSize;
Expand All @@ -90,6 +103,7 @@ public String name() {
public Set<String> validOptions() {
return ImmutableSet.of(
MIN_INPUT_FILES,
MIN_DELETES_PER_FILE,
MIN_FILE_SIZE_BYTES,
MAX_FILE_SIZE_BYTES
);
Expand Down Expand Up @@ -120,22 +134,28 @@ public RewriteStrategy options(Map<String, String> options) {
MIN_INPUT_FILES,
MIN_INPUT_FILES_DEFAULT);

minDeletesPerFile = PropertyUtil.propertyAsInt(options,
MIN_DELETES_PER_FILE,
MIN_DELETES_PER_FILE_DEFAULT);

validateOptions();
return this;
}

@Override
public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> dataFiles) {
return FluentIterable.from(dataFiles)
.filter(scanTask -> scanTask.length() < minFileSize || scanTask.length() > maxFileSize);
.filter(scanTask -> scanTask.length() < minFileSize || scanTask.length() > maxFileSize ||
taskHasTooManyDeletes(scanTask));
}

@Override
public Iterable<List<FileScanTask>> planFileGroups(Iterable<FileScanTask> dataFiles) {
ListPacker<FileScanTask> packer = new BinPacking.ListPacker<>(maxGroupSize, 1, false);
List<List<FileScanTask>> potentialGroups = packer.pack(dataFiles, FileScanTask::length);
return potentialGroups.stream().filter(group ->
group.size() >= minInputFiles || sizeOfInputFiles(group) > targetFileSize
group.size() >= minInputFiles || sizeOfInputFiles(group) > targetFileSize ||
group.stream().anyMatch(this::taskHasTooManyDeletes)
).collect(Collectors.toList());
}

Expand Down Expand Up @@ -218,6 +238,10 @@ private long sizeOfInputFiles(List<FileScanTask> group) {
return group.stream().mapToLong(FileScanTask::length).sum();
}

private boolean taskHasTooManyDeletes(FileScanTask task) {
return task.deletes() != null && task.deletes().size() >= minDeletesPerFile;
}

private void validateOptions() {
Preconditions.checkArgument(minFileSize >= 0,
"Cannot set %s to a negative number, %d < 0",
Expand All @@ -238,5 +262,9 @@ private void validateOptions() {
Preconditions.checkArgument(minInputFiles > 0,
"Cannot set %s is less than 1. All values less than 1 have the same effect as 1. %d < 1",
MIN_INPUT_FILES, minInputFiles);

Preconditions.checkArgument(minDeletesPerFile > 0,
"Cannot set %s is less than 1. All values less than 1 have the same effect as 1. %d < 1",
MIN_DELETES_PER_FILE, minDeletesPerFile);
}
}
11 changes: 11 additions & 0 deletions core/src/test/java/org/apache/iceberg/MockFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ public static MockFileScanTask mockTask(long length, int sortOrderId) {
return new MockFileScanTask(mockFile);
}

public static MockFileScanTask mockTaskWithDeletes(long length, int nDeletes) {
DeleteFile[] mockDeletes = new DeleteFile[nDeletes];
for (int i = 0; i < nDeletes; i++) {
mockDeletes[i] = Mockito.mock(DeleteFile.class);
}

DataFile mockFile = Mockito.mock(DataFile.class);
Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length);
return new MockFileScanTask(mockFile, mockDeletes);
}

@Override
public long length() {
return length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -107,6 +108,22 @@ public void testFilteringCustomMinMaxFileSize() {
Assert.assertEquals("Should remove files that exceed or are smaller than new bounds", expectedFiles, filtered);
}

@Test
public void testFilteringWithDeletes() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(550 * MB),
BinPackStrategy.MIN_FILE_SIZE_BYTES, Long.toString(490 * MB),
BinPackStrategy.MIN_DELETES_PER_FILE, Integer.toString(2)
));

List<FileScanTask> testFiles = filesOfSize(500, 500, 480, 480, 560, 520);
testFiles.add(MockFileScanTask.mockTaskWithDeletes(500 * MB, 2));
Iterable<FileScanTask> expectedFiles = filesOfSize(480, 480, 560, 500);
Iterable<FileScanTask> filtered = ImmutableList.copyOf(strategy.selectFilesToRewrite(testFiles));

Assert.assertEquals("Should include file with deletes", expectedFiles, filtered);
}

@Test
public void testGroupingMinInputFilesInvalid() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
Expand Down Expand Up @@ -150,6 +167,23 @@ public void testGroupingMinInputFilesValid() {
ImmutableList.of(testFiles), grouped);
}

@Test
public void testGroupingWithDeletes() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
BinPackStrategy.MIN_INPUT_FILES, Integer.toString(5),
BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(550 * MB),
BinPackStrategy.MIN_FILE_SIZE_BYTES, Long.toString(490 * MB),
BinPackStrategy.MIN_DELETES_PER_FILE, Integer.toString(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbendick, can you check our checkstyle config? It looks like lines that have incorrect indentation are getting through.

));

List<FileScanTask> testFiles = Lists.newArrayList();
testFiles.add(MockFileScanTask.mockTaskWithDeletes(500 * MB, 2));
Iterable<List<FileScanTask>> grouped = strategy.planFileGroups(testFiles);

Assert.assertEquals("Should plan 1 groups since there are enough input files",
ImmutableList.of(testFiles), grouped);
}

@Test
public void testMaxGroupSize() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
Expand Down Expand Up @@ -196,12 +230,18 @@ public void testInvalidOptions() {
BinPackStrategy.MIN_FILE_SIZE_BYTES, Long.toString(1000 * MB)));
});

AssertHelpers.assertThrows("Should not allow min input size smaller tha 1",
AssertHelpers.assertThrows("Should not allow min input size smaller than 1",
IllegalArgumentException.class, () -> {
defaultBinPack().options(ImmutableMap.of(
BinPackStrategy.MIN_INPUT_FILES, Long.toString(-5)));
});

AssertHelpers.assertThrows("Should not allow min deletes per file smaller than 1",
IllegalArgumentException.class, () -> {
defaultBinPack().options(ImmutableMap.of(
BinPackStrategy.MIN_DELETES_PER_FILE, Long.toString(-5)));
});

AssertHelpers.assertThrows("Should not allow negative target size",
IllegalArgumentException.class, () -> {
defaultBinPack().options(ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
Expand All @@ -46,6 +50,12 @@
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.actions.SortStrategy;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
Expand Down Expand Up @@ -177,6 +187,59 @@ public void testBinPackWithFilter() {
assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testBinPackWithDeletes() throws Exception {
Table table = createTablePartitioned(4, 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would it be worth while to add inline comments to explain these parameters (like createTablePartitioned(4 /* ??? */, 2 /* ??? */)? Right now, it's hard to tell immediately what these arguments are, but I'll leave the choice to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not needed, because you can see the meaning of the parameters in Intellij

table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
shouldHaveFiles(table, 8);
table.refresh();

CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(),
null, null, null);
int total = (int) dataFiles.stream().mapToLong(ContentFile::recordCount).sum();

RowDelta rowDelta = table.newRowDelta();
// remove 2 rows for odd files, 1 row for even files
for (int i = 0; i < dataFiles.size(); i++) {
DataFile dataFile = dataFiles.get(i);
EncryptedOutputFile outputFile = EncryptedFiles.encryptedOutput(
table.io().newOutputFile(table.locationProvider().newDataLocation(UUID.randomUUID().toString())),
EncryptionKeyMetadata.EMPTY);
PositionDeleteWriter<Record> posDeleteWriter = appenderFactory.newPosDeleteWriter(
outputFile, FileFormat.PARQUET, dataFile.partition());
posDeleteWriter.delete(dataFile.path(), 0);
posDeleteWriter.close();
rowDelta.addDeletes(posDeleteWriter.toDeleteFile());

if (i % 2 != 0) {
outputFile = EncryptedFiles.encryptedOutput(
table.io().newOutputFile(table.locationProvider().newDataLocation(UUID.randomUUID().toString())),
EncryptionKeyMetadata.EMPTY);
posDeleteWriter = appenderFactory.newPosDeleteWriter(outputFile, FileFormat.PARQUET, dataFile.partition());
posDeleteWriter.delete(dataFile.path(), 1);
posDeleteWriter.close();
rowDelta.addDeletes(posDeleteWriter.toDeleteFile());
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know there are some repeated code here for generating deletes. So far I am still not sure what is the correct boundary to create util methods. I am planning to refactor after I add more tests for the RewriteDeleteStrategy

}

rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
Result result = actions().rewriteDataFiles(table)
.option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0")
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1))
.option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE))
.option(BinPackStrategy.MIN_DELETES_PER_FILE, "4")
.execute();
Assert.assertEquals("Action should rewrite 4 data files", 4, result.rewrittenDataFilesCount());

List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);
Assert.assertEquals("12 rows are removed", total - 12, actualRecords.size());
}

@Test
public void testRewriteLargeTableHasResiduals() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
Expand Down