Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 33 additions & 63 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package org.apache.iceberg;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NotFoundException;
Expand All @@ -35,7 +35,6 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -180,16 +179,8 @@ private TableMetadata internalApply() {
}

Set<Long> idsToRetain = Sets.newHashSet();

// Compute the snapshots for each reference
Map<String, Set<Long>> refSnapshots = computeRefSnapshots(base.refs());

// Identify unreferenced snapshots which should be retained
Set<Long> unreferencedSnapshotsToRetain = computeUnreferencedSnapshotsToRetain(refSnapshots);
idsToRetain.addAll(unreferencedSnapshotsToRetain);

// Identify refs that should be removed
Map<String, SnapshotRef> retainedRefs = computeRetainedRefs(base.refs());
Map<String, SnapshotRef> retainedRefs = computeRetainedRefs(base.refs());
Map<Long, List<String>> retainedIdToRefs = Maps.newHashMap();
for (Map.Entry<String, SnapshotRef> retainedRefEntry : retainedRefs.entrySet()) {
long snapshotId = retainedRefEntry.getValue().snapshotId();
Expand All @@ -204,45 +195,24 @@ private TableMetadata internalApply() {
"Cannot expire %s. Still referenced by refs: %s", idToRemove, refsForId);
}

Set<Long> branchSnapshotsToRetain = computeAllBranchSnapshotsToRetain(retainedRefs, refSnapshots);
idsToRetain.addAll(branchSnapshotsToRetain);
idsToRetain.addAll(computeAllBranchSnapshotsToRetain(retainedRefs.values()));
idsToRetain.addAll(unreferencedSnapshotsToRetain(retainedRefs.values()));

TableMetadata.Builder updatedMetaBuilder = TableMetadata.buildFrom(base);

base.refs().keySet().stream()
.filter(ref -> !retainedRefs.containsKey(ref))
.forEach(updatedMetaBuilder::removeRef);

base.snapshots().stream()
.map(Snapshot::snapshotId)
.filter(snapshot -> !idsToRetain.contains(snapshot))
.forEach(idsToRemove::add);
updatedMetaBuilder.removeSnapshots(idsToRemove);

base.refs().keySet().stream()
.filter(ref -> !retainedRefs.containsKey(ref))
.forEach(updatedMetaBuilder::removeRef);

return updatedMetaBuilder.build();
}

/**
* Helper to compute the mapping of a ref to its snapshots. If it's a branch, the snapshots is an ordered set
* of all the snapshots on the branch. If it's a tag, the snapshot is a set of the single snapshot the tag refers to
*/
private Map<String, Set<Long>> computeRefSnapshots(Map<String, SnapshotRef> refs) {
Map<String, Set<Long>> refSnapshots = Maps.newHashMap();
for (Map.Entry<String, SnapshotRef> refEntry : refs.entrySet()) {
String name = refEntry.getKey();
SnapshotRef ref = refEntry.getValue();
if (ref.isBranch()) {
Set<Long> branchAncestors = Sets.newLinkedHashSet();
Iterable<Snapshot> snapshots = SnapshotUtil.ancestorsOf(ref.snapshotId(), base::snapshot);
snapshots.forEach(snapshot -> branchAncestors.add(snapshot.snapshotId()));
refSnapshots.put(name, branchAncestors);
} else {
refSnapshots.put(name, ImmutableSet.of(ref.snapshotId()));
}
}

return refSnapshots;
}

private Map<String, SnapshotRef> computeRetainedRefs(Map<String, SnapshotRef> refs) {
Map<String, SnapshotRef> retainedRefs = Maps.newHashMap();
for (Map.Entry<String, SnapshotRef> refEntry : refs.entrySet()) {
Expand All @@ -268,47 +238,28 @@ private Map<String, SnapshotRef> computeRetainedRefs(Map<String, SnapshotRef> re
return retainedRefs;
}

private Set<Long> computeUnreferencedSnapshotsToRetain(Map<String, Set<Long>> refSnapshots) {
Set<Long> referencedSnapshots = Sets.newHashSet();
refSnapshots.values().forEach(referencedSnapshots::addAll);

return base.snapshots().stream()
.filter(snapshot -> !referencedSnapshots.contains(snapshot.snapshotId()))
.filter(snapshot -> snapshot.timestampMillis() >= defaultExpireOlderThan)
.map(Snapshot::snapshotId)
.collect(Collectors.toSet());
}

private Set<Long> computeAllBranchSnapshotsToRetain(
Map<String, SnapshotRef> refs,
Map<String, Set<Long>> refSnapshots) {

private Set<Long> computeAllBranchSnapshotsToRetain(Collection<SnapshotRef> refs) {
Set<Long> branchSnapshotsToRetain = Sets.newHashSet();
for (Map.Entry<String, SnapshotRef> refEntry : refs.entrySet()) {
final String name = refEntry.getKey();
final SnapshotRef ref = refEntry.getValue();

for (SnapshotRef ref : refs) {
if (ref.isBranch()) {
long expireSnapshotsOlderThan = ref.maxSnapshotAgeMs() != null ? now - ref.maxSnapshotAgeMs() :
defaultExpireOlderThan;
int minSnapshotsToKeep = ref.minSnapshotsToKeep() != null ? ref.minSnapshotsToKeep() :
defaultMinNumSnapshots;
Set<Long> branchAncestors = refSnapshots.get(name);
branchSnapshotsToRetain.addAll(
computeBranchSnapshotsToRetain(branchAncestors, expireSnapshotsOlderThan, minSnapshotsToKeep));
computeBranchSnapshotsToRetain(ref.snapshotId(), expireSnapshotsOlderThan, minSnapshotsToKeep));
}
}

return branchSnapshotsToRetain;
}

private Set<Long> computeBranchSnapshotsToRetain(
Set<Long> branchSnapshots,
long snapshot,
long expireSnapshotsOlderThan,
int minSnapshotsToKeep) {
Set<Long> idsToRetain = Sets.newHashSet();
for (long snapshot : branchSnapshots) {
Snapshot ancestor = base.snapshot(snapshot);
for (Snapshot ancestor : SnapshotUtil.ancestorsOf(snapshot, base::snapshot)) {
if (idsToRetain.size() < minSnapshotsToKeep || ancestor.timestampMillis() >= expireSnapshotsOlderThan) {
idsToRetain.add(ancestor.snapshotId());
} else {
Expand All @@ -319,6 +270,25 @@ private Set<Long> computeBranchSnapshotsToRetain(
return idsToRetain;
}

private Set<Long> unreferencedSnapshotsToRetain(Collection<SnapshotRef> refs) {
Set<Long> referencedSnapshots = Sets.newHashSet();
for (SnapshotRef ref : refs) {
for (Snapshot snapshot : SnapshotUtil.ancestorsOf(ref.snapshotId(), base::snapshot)) {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this works too, I built the complex map before hand so that the ancestors didn't need to be traversed again and could be maintained in memory for the later computation, but maybe that's a overkill (and also consumes even more memory)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't that expensive. We could also pass a set into the branch logic to update, but that would require changes there and would use an input variable as output, so I'd prefer not to.

referencedSnapshots.add(snapshot.snapshotId());
}
}

Set<Long> snapshotsToRetain = Sets.newHashSet();
for (Snapshot snapshot : base.snapshots()) {
if (!referencedSnapshots.contains(snapshot.snapshotId()) && // unreferenced
snapshot.timestampMillis() >= defaultExpireOlderThan) { // not old enough to expire
snapshotsToRetain.add(snapshot.snapshotId());
}
}

return snapshotsToRetain;
}

@Override
public void commit() {
Tasks.foreach(ops)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,15 @@ static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node)

// parse properties map
Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, node);
long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
long currentSnapshotId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node);

Map<String, SnapshotRef> refs;
if (node.has(REFS)) {
refs = refsFromJson(node.get(REFS));
} else if (currentSnapshotId != -1) {
// initialize the main branch if there are no refs
refs = ImmutableMap.of(SnapshotRef.MAIN_BRANCH, SnapshotRef.branchBuilder(currentSnapshotId).build());

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not directly related to the expiration logic, but Is initializing the main branch when parsing sufficient for setting the main ref on the next commit? I was thinking it would have to be set in the builder.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should follow up with a more thorough fix, but this is a good start.

} else {
refs = ImmutableMap.of();
}
Expand Down Expand Up @@ -457,7 +460,7 @@ static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node)

return new TableMetadata(metadataLocation, formatVersion, uuid, location,
lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, currentSchemaId, schemas, defaultSpecId, specs,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentVersionId,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, entries.build(), metadataEntries.build(), refs,
ImmutableList.of() /* no changes from the file */);
}
Expand Down
Loading