Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@
import java.util.function.Consumer;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("checkstyle:VisibilityModifier")
abstract class FileCleanupStrategy {
private final Consumer<String> defaultDeleteFunc =
new Consumer<>() {
@Override
public void accept(String file) {
fileIO.deleteFile(file);
}
};

private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class);

protected final FileIO fileIO;
Expand Down Expand Up @@ -75,14 +85,32 @@ protected CloseableIterable<ManifestFile> readManifests(Snapshot snapshot) {
}

protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
Tasks.foreach(pathsToDelete)
.executeWith(deleteExecutorService)
.retry(3)
.stopRetryOn(NotFoundException.class)
.suppressFailureWhenFinished()
.onFailure(
(file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
.run(deleteFunc::accept);
if (deleteFunc == null && fileIO instanceof SupportsBulkOperations) {
try {
((SupportsBulkOperations) fileIO).deleteFiles(pathsToDelete);
} catch (BulkDeletionFailureException e) {
LOG.warn(
"Bulk deletion failed for {} of {} {} file(s)",
e.numberFailedObjects(),
pathsToDelete.size(),
fileType,
e);
} catch (RuntimeException e) {
LOG.warn("Bulk deletion failed", e);
}
} else {
Consumer<String> deleteFuncToUse = deleteFunc == null ? defaultDeleteFunc : deleteFunc;

Tasks.foreach(pathsToDelete)
.executeWith(deleteExecutorService)
.retry(3)
.stopRetryOn(NotFoundException.class)
.stopOnFailure()
.suppressFailureWhenFinished()
.onFailure(
(file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
.run(deleteFuncToUse::accept);
}
}

protected boolean hasAnyStatisticsFiles(TableMetadata tableMetadata) {
Expand Down
10 changes: 1 addition & 9 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ class RemoveSnapshots implements ExpireSnapshots {
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE =
MoreExecutors.newDirectExecutorService();

private final Consumer<String> defaultDelete =
new Consumer<String>() {
@Override
public void accept(String file) {
ops.io().deleteFile(file);
}
};

private final TableOperations ops;
private final Set<Long> idsToRemove = Sets.newHashSet();
private final long now;
Expand All @@ -81,7 +73,7 @@ public void accept(String file) {
private TableMetadata base;
private long defaultExpireOlderThan;
private int defaultMinNumSnapshots;
private Consumer<String> deleteFunc = defaultDelete;
private Consumer<String> deleteFunc = null;
private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
private Boolean incrementalCleanup;
Expand Down
94 changes: 94 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.times;

import java.io.File;
import java.io.IOException;
Expand All @@ -40,8 +41,10 @@
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
Expand Down Expand Up @@ -1625,6 +1628,83 @@ public void testRetainFilesOnRetainedBranches() {
assertThat(deletedFiles).isEqualTo(expectedDeletes);
}

@TestTemplate
public void testRemoveFromTableWithBulkIO() {
TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this approach.
If we create our own FileIO operation, why do we use Mockio to spy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just use a TestFileIO which does the counting, and then we can forget about the Mockito in this test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's cleaner to use a Mockito spy on these functions and perform the verification steps on the spy. The custom FileIO was needed to have a test FileIO that derives from SupportsBulkOperations and has a deleteFiles() function. For sure we could also introduce a Set in our FileIO implementation that collects the paths received as param, also we could introduce a counter that counts how many time the deleteFiles() function was called, etc., and also we could separate the received paths by call of this function, but that is unnecessarily written code as Mockito already gives this for us.
See L1700-1703: we verify that deleteFiles() was called 3 times, and we could verify the given paths broken down by each call of the function. With this we could see that even in a scenario where we get an exception during deletion, we still call the deleteFiles() function for all the other file types too.

I prefer the current implementation compared to write custom code for verification, but I might miss something here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use something like this then:

    SupportsBulkOperations bulkFileIO = Mockito.mock(SupportsBulkOperations.class, withSettings().extraInterfaces(FileIO.class));

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, @pvary ! I tried this out, but apparently it's not suitable for the tests to have a FileIO completely mocked, it would need at least a TestFileIO instance to be able to make operations on the test tables. In this case a very early step (tableWithBulkIO.newAppend().appendFile(FILE_A).commit();) fails if there is no FileIO instance just a mock under the table.


Mockito.doNothing().when(spyFileIO).deleteFiles(any());

runBulkDeleteTest(spyFileIO);
}

@TestTemplate
public void testBulkDeletionWithBulkDeletionFailureException() {
TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());

Mockito.doThrow(new BulkDeletionFailureException(2))
.doNothing()
.when(spyFileIO)
.deleteFiles(any());

runBulkDeleteTest(spyFileIO);
}

@TestTemplate
public void testBulkDeletionWithRuntimeException() {
TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());

Mockito.doThrow(new RuntimeException("Exception when bulk deleting"))
.doNothing()
.when(spyFileIO)
.deleteFiles(any());

runBulkDeleteTest(spyFileIO);
}

private void runBulkDeleteTest(TestBulkLocalFileIO spyFileIO) {
String tableName = "tableWithBulkIO";
Table tableWithBulkIO =
TestTables.create(
tableDir,
tableName,
SCHEMA,
SPEC,
SortOrder.unsorted(),
formatVersion,
new TestTables.TestTableOperations(tableName, tableDir, spyFileIO));

tableWithBulkIO.newAppend().appendFile(FILE_A).commit();

Set<String> deletedManifestLists =
Sets.newHashSet(tableWithBulkIO.currentSnapshot().manifestListLocation());
Set<String> deletedManifests =
tableWithBulkIO.currentSnapshot().allManifests(table.io()).stream()
.map(ManifestFile::path)
.collect(Collectors.toSet());

tableWithBulkIO.newDelete().deleteFile(FILE_A).commit();

deletedManifestLists.add(tableWithBulkIO.currentSnapshot().manifestListLocation());
deletedManifests.addAll(
tableWithBulkIO.currentSnapshot().allManifests(table.io()).stream()
.map(ManifestFile::path)
.collect(Collectors.toSet()));

tableWithBulkIO.newAppend().appendFile(FILE_B).commit();

long lastSnapshotId = tableWithBulkIO.currentSnapshot().snapshotId();

removeSnapshots(tableWithBulkIO).expireOlderThan(System.currentTimeMillis()).commit();

assertThat(tableWithBulkIO.currentSnapshot().snapshotId()).isEqualTo(lastSnapshotId);
assertThat(tableWithBulkIO.snapshots()).containsOnly(tableWithBulkIO.currentSnapshot());

Mockito.verify(spyFileIO, times(3)).deleteFiles(any());
Mockito.verify(spyFileIO).deleteFiles(Set.of(FILE_A.location()));
Mockito.verify(spyFileIO).deleteFiles(deletedManifestLists);
Mockito.verify(spyFileIO).deleteFiles(deletedManifests);
}

@TestTemplate
public void testRemoveSpecDuringExpiration() {
DataFile file =
Expand Down Expand Up @@ -1867,4 +1947,18 @@ private static PartitionStatisticsFile reusePartitionStatsFile(
private static void commitPartitionStats(Table table, PartitionStatisticsFile statisticsFile) {
table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
}

private static class TestBulkLocalFileIO extends TestTables.LocalFileIO
implements SupportsBulkOperations {

@Override
public void deleteFile(String path) {
throw new RuntimeException("Expected to call the bulk delete interface.");
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
throw new RuntimeException("Expected to mock this function");
}
}
}