Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,4 +85,25 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
(file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
.run(deleteFunc::accept);
}

protected Set<String> expiredStatisticsFilesLocations(
TableMetadata beforeExpiration, TableMetadata afterExpiration) {
Set<String> statsFileLocationsBeforeExpiration = statsFileLocations(beforeExpiration);
Set<String> statsFileLocationsAfterExpiration = statsFileLocations(afterExpiration);

return Sets.difference(statsFileLocationsBeforeExpiration, statsFileLocationsAfterExpiration);
}

private Set<String> statsFileLocations(TableMetadata tableMetadata) {
Set<String> statsFileLocations = Sets.newHashSet();

if (tableMetadata.statisticsFiles() != null) {
statsFileLocations =
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());
}

return statsFileLocations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
Expand Down Expand Up @@ -260,10 +259,14 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);

deleteFiles(filesToDelete, "data");
LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete));
LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete));
deleteFiles(manifestsToDelete, "manifest");
deleteFiles(manifestListsToDelete, "manifest list");

if (!beforeExpiration.statisticsFiles().isEmpty()) {
Set<String> expiredStatisticsFilesLocations =
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
}
Comment on lines +265 to +269
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we could just have expireStatisticsFilesLocation short circuit before computing in case before expiration is empty. Rather than have the if(!beforeExpiration.statisticsFiles().isEmpty()) in both cleanup implementations

}

private Set<String> findFilesToDelete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
}

deleteFiles(manifestListsToDelete, "manifest list");

if (!beforeExpiration.statisticsFiles().isEmpty()) {
deleteFiles(
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration), "statistics files");
}
}

private Set<ManifestFile> pruneReferencedManifests(
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ public Builder removeSnapshots(Collection<Long> idsToRemove) {
if (idsToRemove.contains(snapshotId)) {
snapshotsById.remove(snapshotId);
changes.add(new MetadataUpdate.RemoveSnapshot(snapshotId));
removeStatistics(snapshotId);
} else {
retainedSnapshots.add(snapshot);
}
Expand Down
126 changes: 126 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@
*/
package org.apache.iceberg;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
Expand Down Expand Up @@ -1234,6 +1243,81 @@ public void testMultipleRefsAndCleanExpiredFilesFailsForIncrementalCleanup() {
.commit());
}

@Test
public void testExpireWithStatisticsFiles() throws IOException {
table.newAppend().appendFile(FILE_A).commit();
String statsFileLocation1 = statsFileLocation(table.location());
StatisticsFile statisticsFile1 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
statsFileLocation1,
table.io());
commitStats(table, statisticsFile1);

table.newAppend().appendFile(FILE_B).commit();
String statsFileLocation2 = statsFileLocation(table.location());
StatisticsFile statisticsFile2 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
statsFileLocation2,
table.io());
commitStats(table, statisticsFile2);
Assert.assertEquals("Should have 2 statistics file", 2, table.statisticsFiles().size());

long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis());
removeSnapshots(table).expireOlderThan(tAfterCommits).commit();

// only the current snapshot and its stats file should be retained
Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots()));
Assertions.assertThat(table.statisticsFiles())
.hasSize(1)
.extracting(StatisticsFile::snapshotId)
.as("Should contain only the statistics file of snapshot2")
.isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId()));

Assertions.assertThat(new File(statsFileLocation1).exists()).isFalse();
Assertions.assertThat(new File(statsFileLocation2).exists()).isTrue();
}

@Test
public void testExpireWithStatisticsFilesWithReuse() throws IOException {
table.newAppend().appendFile(FILE_A).commit();
String statsFileLocation1 = statsFileLocation(table.location());
StatisticsFile statisticsFile1 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
statsFileLocation1,
table.io());
commitStats(table, statisticsFile1);

table.newAppend().appendFile(FILE_B).commit();
// If an expired snapshot's stats file is reused for some reason by the live snapshots,
// that stats file should not get deleted from the file system as the live snapshots still
// reference it.
StatisticsFile statisticsFile2 =
reuseStatsFile(table.currentSnapshot().snapshotId(), statisticsFile1);
commitStats(table, statisticsFile2);

Assert.assertEquals("Should have 2 statistics file", 2, table.statisticsFiles().size());

long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis());
removeSnapshots(table).expireOlderThan(tAfterCommits).commit();

// only the current snapshot and its stats file (reused from previous snapshot) should be
// retained
Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots()));
Assertions.assertThat(table.statisticsFiles())
.hasSize(1)
.extracting(StatisticsFile::snapshotId)
.as("Should contain only the statistics file of snapshot2")
.isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId()));
// the reused stats file should exist.
Assertions.assertThat(new File(statsFileLocation1).exists()).isTrue();
}

@Test
public void testFailRemovingSnapshotWhenStillReferencedByBranch() {
table.newAppend().appendFile(FILE_A).commit();
Expand Down Expand Up @@ -1515,4 +1599,46 @@ private RemoveSnapshots removeSnapshots(Table table) {
RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots();
return (RemoveSnapshots) removeSnapshots.withIncrementalCleanup(incrementalCleanup);
}

private StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
puffinWriter.add(
new Blob(
"some-blob-type",
ImmutableList.of(1),
snapshotId,
snapshotSequenceNumber,
ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
puffinWriter.finish();

return new GenericStatisticsFile(
snapshotId,
statsLocation,
puffinWriter.fileSize(),
puffinWriter.footerSize(),
puffinWriter.writtenBlobsMetadata().stream()
.map(GenericBlobMetadata::from)
.collect(ImmutableList.toImmutableList()));
}
}

private StatisticsFile reuseStatsFile(long snapshotId, StatisticsFile statisticsFile) {
return new GenericStatisticsFile(
snapshotId,
statisticsFile.path(),
statisticsFile.fileSizeInBytes(),
statisticsFile.fileFooterSizeInBytes(),
statisticsFile.blobMetadata());
}

private void commitStats(Table table, StatisticsFile statisticsFile) {
table.updateStatistics().setStatistics(statisticsFile.snapshotId(), statisticsFile).commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it odd that a single line like this is in a separate method. Seems like this could be inlined and would make the tests more readable.

}

private String statsFileLocation(String tableLocation) {
String statsFileName = "stats-file-" + UUID.randomUUID();
return tableLocation + "/metadata/" + statsFileName;
}
}