Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,23 @@ public List<Snapshot> apply() {

private TableMetadata internalApply() {
this.base = ops.refresh();

// Keeping track of reverse linkage i.e. if a snapshot which is sourced by other (Ex. cherry-pick) then that should
// also be removed from Table, else the link would be removed from snapshot history, and when expiring that
// source snapshot later, we might delete underlying dataFiles which are commit as part of this expiring snapshot
// NOTE we don't have to do this for `expireOlderThan` flag, since we know that `source-snapshot-id` will be expired
// first in that case.
List<Long> sourceIdsToRemove = Lists.newArrayList();
for (Long idToRemove : idsToRemove) {
Snapshot snapshotToRemove = base.snapshot(idToRemove);
if (snapshotToRemove != null) {
String snapshotId = snapshotToRemove.summary().getOrDefault(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, null);
if (snapshotId != null) {
sourceIdsToRemove.add(Long.valueOf(snapshotId));
}
}
}
return base.removeSnapshotsIf(snapshot ->
idsToRemove.contains(snapshot.snapshotId()) ||
idsToRemove.contains(snapshot.snapshotId()) || sourceIdsToRemove.contains(snapshot.snapshotId()) ||
(expireOlderThan != null && snapshot.timestampMillis() < expireOlderThan));
}

Expand Down Expand Up @@ -169,6 +183,9 @@ private void cleanExpiredFiles(List<Snapshot> snapshots, Set<Long> validIds, Set
// only remove files that were deleted in an ancestor of the current table state to avoid
// physically deleting files that were logically deleted in a commit that was rolled back.
Set<Long> ancestorIds = Sets.newHashSet(SnapshotUtil.ancestorIds(base.currentSnapshot(), base::snapshot));
//This is the source snapshots which are being refered by all the valid ancestors
Set<Long> publishedSourceIds = Sets
.newHashSet(SnapshotUtil.publishedSourceSnapshotIds(ancestorIds, base::snapshot));

Set<String> validManifests = Sets.newHashSet();
Set<String> manifestsToScan = Sets.newHashSet();
Expand Down Expand Up @@ -214,14 +231,18 @@ private void cleanExpiredFiles(List<Snapshot> snapshots, Set<Long> validIds, Set
}

if (!isFromAncestor && isFromExpiringSnapshot && manifest.hasAddedFiles()) {
// Because the manifest was written by a snapshot that is not an ancestor of the
// current table state, the files added in this manifest can be removed. The extra
// check whether the manifest was written by a known snapshot that was expired in
// this commit ensures that the full ancestor list between when the snapshot was
// written and this expiration is known and there is no missing history. If history
// were missing, then the snapshot could be an ancestor of the table state but the
// ancestor ID set would not contain it and this would be unsafe.
manifestsToRevert.add(manifest.path());
// Checking if the dangling snapshot is not referred by a published snapshot. We can't delete the
// underlying files, which are published by another snapshot due to operations like CherryPick
if (!publishedSourceIds.contains(snapshotId)) {
// Because the manifest was written by a snapshot that is not an ancestor of the
// current table state, the files added in this manifest can be removed. The extra
// check whether the manifest was written by a known snapshot that was expired in
// this commit ensures that the full ancestor list between when the snapshot was
// written and this expiration is known and there is no missing history. If history
// were missing, then the snapshot could be an ancestor of the table state but the
// ancestor ID set would not contain it and this would be unsafe.
manifestsToRevert.add(manifest.path());
}
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;

public class SnapshotUtil {
Expand Down Expand Up @@ -55,4 +57,16 @@ public static List<Long> ancestorIds(Snapshot snapshot, Function<Long, Snapshot>
}
return ancestorIds;
}

public static List<Long> publishedSourceSnapshotIds(Set<Long> ancestorIds, Function<Long, Snapshot> lookup) {
List<Long> sourceSnapshotIds = Lists.newArrayList();
for (Long id : ancestorIds) {
Snapshot current = lookup.apply(id);
String sourceSnapshotId = current.summary().getOrDefault(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, null);
if (sourceSnapshotId != null) {
sourceSnapshotIds.add(Long.valueOf(sourceSnapshotId));
}
}
return sourceSnapshotIds;
}
}
8 changes: 5 additions & 3 deletions core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand Down Expand Up @@ -97,11 +99,13 @@ TestTableOperations ops() {

private static final Map<String, TableMetadata> METADATA = Maps.newHashMap();
private static final Map<String, Integer> VERSIONS = Maps.newHashMap();
public static final List<String> DELETED_FILE_PATHS = Lists.newArrayList();

static void clearTables() {
synchronized (METADATA) {
METADATA.clear();
VERSIONS.clear();
DELETED_FILE_PATHS.clear();
}
}

Expand Down Expand Up @@ -218,9 +222,7 @@ public OutputFile newOutputFile(String path) {

@Override
public void deleteFile(String path) {
if (!new File(path).delete()) {
throw new RuntimeIOException("Failed to delete file: " + path);
}
DELETED_FILE_PATHS.add(path);
}
}
}
77 changes: 77 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
import org.apache.iceberg.exceptions.DuplicateWAPCommitException;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -688,4 +689,80 @@ public void testNonWapCherrypick() {
table.manageSnapshots().cherrypick(firstSnapshotId).commit();
});
}

@Test
public void testWithExpiringCherryPick() {
table.newAppend()
.appendFile(FILE_A)
.commit();
TableMetadata base = readMetadata();
// first WAP commit
table.newAppend()
.appendFile(FILE_B)
.set(SnapshotSummary.STAGED_WAP_ID_PROP, "123456789")
.stageOnly()
.commit();
table.refresh();

// pick the snapshot that's staged but not committed
Snapshot wap1Snapshot = null;
for (Snapshot s : table.snapshots()) {
if ("123456789".equalsIgnoreCase(s.summary()
.getOrDefault(SnapshotSummary.STAGED_WAP_ID_PROP, null))) {
wap1Snapshot = s;
break;
}
}
Assert.assertNotNull(wap1Snapshot);

// table has new commit so that cherrypick can come into affect.
table.newAppend()
.appendFile(FILE_C)
.commit();
table.refresh();

// cherry-pick first snapshot
table.manageSnapshots().cherrypick(wap1Snapshot.snapshotId()).commit();
table.refresh();
Snapshot wap1CherryPickSnapshot = table.currentSnapshot();
Assert.assertEquals("123456789", wap1CherryPickSnapshot.summary().getOrDefault(SnapshotSummary.PUBLISHED_WAP_ID_PROP,
null));
Assert.assertEquals(wap1CherryPickSnapshot.summary().getOrDefault(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP,
null), String.valueOf(wap1Snapshot.snapshotId()));


table.expireSnapshots().expireSnapshotId(wap1Snapshot.snapshotId()).commit();
table.expireSnapshots().expireSnapshotId(wap1CherryPickSnapshot.snapshotId()).commit();

Lists.newArrayList(wap1Snapshot, wap1CherryPickSnapshot).forEach(i -> {
i.addedFiles().forEach(item -> {
Assert.assertFalse(TestTables.DELETED_FILE_PATHS.contains(item.path().toString()));
});
});

// second WAP commit
table.newAppend()
.appendFile(FILE_D)
.set("wap.id", "987654321")
.stageOnly()
.commit();
table.refresh();

Snapshot wap2Snapshot = null;
for (Snapshot s : table.snapshots()) {
if ("987654321".equalsIgnoreCase(s.summary()
.getOrDefault(SnapshotSummary.STAGED_WAP_ID_PROP, null))) {
wap2Snapshot = s;
break;
}
}
Assert.assertNotNull(wap2Snapshot);
final long wap2SnapshotId = wap2Snapshot.snapshotId();
table.expireSnapshots().expireSnapshotId(wap2SnapshotId).commit();

wap2Snapshot.addedFiles().forEach(item -> {
Assert.assertTrue(TestTables.DELETED_FILE_PATHS.contains(item.path().toString()));
});
}

}