Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected FileCleanupStrategy(
.select(
"manifest_path", "manifest_length", "added_snapshot_id", "deleted_data_files_count");

protected CloseableIterable<ManifestFile> readManifestFiles(Snapshot snapshot) {
protected CloseableIterable<ManifestFile> readManifests(Snapshot snapshot) {
if (snapshot.manifestListLocation() != null) {
return Avro.read(fileIO.newInputFile(snapshot.manifestListLocation()))
.rename("manifest_file", GenericManifestFile.class.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
exc))
.run(
snapshot -> {
try (CloseableIterable<ManifestFile> manifests = readManifestFiles(snapshot)) {
try (CloseableIterable<ManifestFile> manifests = readManifests(snapshot)) {
for (ManifestFile manifest : manifests) {
validManifests.add(manifest.path());

Expand Down Expand Up @@ -210,7 +210,7 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
}

// find any manifests that are no longer needed
try (CloseableIterable<ManifestFile> manifests = readManifestFiles(snapshot)) {
try (CloseableIterable<ManifestFile> manifests = readManifests(snapshot)) {
for (ManifestFile manifest : manifests) {
if (!validManifests.contains(manifest.path())) {
manifestsToDelete.add(manifest.path());
Expand Down
106 changes: 73 additions & 33 deletions core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,39 +63,83 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
}
}
}

Set<ManifestFile> candidateManifestFilesForDeletion = readManifests(expiredSnapshots);
Set<ManifestFile> manifestFilesAfterExpiration = readManifests(snapshotsAfterExpiration);

Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
for (ManifestFile candidateManifestFile : candidateManifestFilesForDeletion) {
if (!manifestFilesAfterExpiration.contains(candidateManifestFile)) {
manifestsToDelete.add(candidateManifestFile);
Set<ManifestFile> deletionCandidates = readManifests(expiredSnapshots);

if (!deletionCandidates.isEmpty()) {
Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
Set<ManifestFile> manifestsToDelete =
pruneReferencedManifests(
snapshotsAfterExpiration, deletionCandidates, currentManifests::add);

if (!manifestsToDelete.isEmpty()) {
Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
deleteFiles(dataFilesToDelete, "data");
Set<String> manifestPathsToDelete =
manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet());
deleteFiles(manifestPathsToDelete, "manifest");
}
}

Set<String> dataFilesToDelete =
findFilesToDelete(manifestsToDelete, manifestFilesAfterExpiration);
deleteFiles(dataFilesToDelete, "data");
Set<String> manifestPathsToDelete =
manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet());

deleteFiles(manifestPathsToDelete, "manifest");
deleteFiles(manifestListsToDelete, "manifest list");
}

private Set<ManifestFile> pruneReferencedManifests(
Set<Snapshot> snapshots,
Set<ManifestFile> deletionCandidates,
Consumer<ManifestFile> currentManifestCallback) {
Set<ManifestFile> candidateSet = ConcurrentHashMap.newKeySet();
candidateSet.addAll(deletionCandidates);
Tasks.foreach(snapshots)
.retry(3)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(planExecutorService)
.onFailure(
(snapshot, exc) ->
LOG.warn(
"Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
.run(
snapshot -> {
try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
for (ManifestFile manifestFile : manifestFiles) {
candidateSet.remove(manifestFile);
if (candidateSet.isEmpty()) {
return;
}

currentManifestCallback.accept(manifestFile.copy());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor, but it seems weird to copy here rather than in the callback. If the callback were a noop, we'd be doing work for nothing. Since we know it needs to be copied, it seems fine though.

}
} catch (IOException e) {
throw new RuntimeIOException(
e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
}
});

return candidateSet;
}

private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
Set<ManifestFile> manifestFiles = Sets.newHashSet();
for (Snapshot snapshot : snapshots) {
try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
for (ManifestFile manifestFile : manifestFilesForSnapshot) {
manifestFiles.add(manifestFile.copy());
}
} catch (IOException e) {
throw new RuntimeIOException(
e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
}
}
Set<ManifestFile> manifestFiles = ConcurrentHashMap.newKeySet();
Tasks.foreach(snapshots)
.retry(3)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(planExecutorService)
.onFailure(
(snapshot, exc) ->
LOG.warn(
"Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
.run(
snapshot -> {
try (CloseableIterable<ManifestFile> manifests = readManifests(snapshot)) {
for (ManifestFile manifestFile : manifests) {
manifestFiles.add(manifestFile.copy());
}
} catch (IOException e) {
throw new RuntimeIOException(
e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
}
});

return manifestFiles;
}
Expand All @@ -112,9 +156,7 @@ private Set<String> findFilesToDelete(
.onFailure(
(item, exc) ->
LOG.warn(
"Failed to determine live files in manifest {}: this may cause orphaned data files",
item.path(),
exc))
"Failed to determine live files in manifest {}. Retrying", item.path(), exc))
.run(
manifest -> {
try (CloseableIterable<String> paths = ManifestFiles.readPaths(manifest, fileIO)) {
Expand All @@ -137,9 +179,7 @@ private Set<String> findFilesToDelete(
.onFailure(
(item, exc) ->
LOG.warn(
"Failed to determine live files in manifest {}: this may cause orphaned data files",
item.path(),
exc))
"Failed to determine live files in manifest {}. Retrying", item.path(), exc))
.run(
manifest -> {
if (filesToDelete.isEmpty()) {
Expand All @@ -155,7 +195,7 @@ private Set<String> findFilesToDelete(
});

} catch (Throwable e) {
LOG.warn("Failed to determine the data files to be removed", e);
LOG.warn("Failed to list all reachable files", e);
return Sets.newHashSet();
}

Expand Down