Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 136 additions & 21 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.iceberg;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -34,6 +36,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.DateTimeUtil;
Expand All @@ -54,6 +57,8 @@
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.MAX_REF_AGE_MS;
import static org.apache.iceberg.TableProperties.MAX_REF_AGE_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.MAX_SNAPSHOT_AGE_MS;
import static org.apache.iceberg.TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.MIN_SNAPSHOTS_TO_KEEP;
Expand All @@ -75,32 +80,40 @@ public void accept(String file) {

private final TableOperations ops;
private final Set<Long> idsToRemove = Sets.newHashSet();
private final long now;
private final long defaultMaxRefAgeMs;
private boolean cleanExpiredFiles = true;
private TableMetadata base;
private long expireOlderThan;
private int minNumSnapshots;
private long defaultExpireOlderThan;
private int defaultMinNumSnapshots;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();

RemoveSnapshots(TableOperations ops) {
this.ops = ops;
this.base = ops.current();
ValidationException.check(
PropertyUtil.propertyAsBoolean(base.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)");

long maxSnapshotAgeMs = PropertyUtil.propertyAsLong(
long defaultMaxSnapshotAgeMs = PropertyUtil.propertyAsLong(
base.properties(),
MAX_SNAPSHOT_AGE_MS,
MAX_SNAPSHOT_AGE_MS_DEFAULT);
this.expireOlderThan = System.currentTimeMillis() - maxSnapshotAgeMs;

this.minNumSnapshots = PropertyUtil.propertyAsInt(
this.now = System.currentTimeMillis();
this.defaultExpireOlderThan = now - defaultMaxSnapshotAgeMs;
this.defaultMinNumSnapshots = PropertyUtil.propertyAsInt(
base.properties(),
MIN_SNAPSHOTS_TO_KEEP,
MIN_SNAPSHOTS_TO_KEEP_DEFAULT);

ValidationException.check(
PropertyUtil.propertyAsBoolean(base.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)");
this.defaultMaxRefAgeMs = PropertyUtil.propertyAsLong(
base.properties(),
MAX_REF_AGE_MS,
MAX_REF_AGE_MS_DEFAULT
);
}

@Override
Expand All @@ -120,15 +133,15 @@ public ExpireSnapshots expireSnapshotId(long expireSnapshotId) {
public ExpireSnapshots expireOlderThan(long timestampMillis) {
LOG.info("Expiring snapshots older than: {} ({})",
DateTimeUtil.formatTimestampMillis(timestampMillis), timestampMillis);
this.expireOlderThan = timestampMillis;
this.defaultExpireOlderThan = timestampMillis;
return this;
}

@Override
public ExpireSnapshots retainLast(int numSnapshots) {
Preconditions.checkArgument(1 <= numSnapshots,
"Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
this.minNumSnapshots = numSnapshots;
this.defaultMinNumSnapshots = numSnapshots;
return this;
}

Expand Down Expand Up @@ -161,21 +174,119 @@ public List<Snapshot> apply() {

private TableMetadata internalApply() {
this.base = ops.refresh();
if (base.snapshots().isEmpty()) {
return base;
}

Set<Long> idsToRetain = Sets.newHashSet();
List<Long> ancestorIds = SnapshotUtil.ancestorIds(base.currentSnapshot(), base::snapshot);
if (minNumSnapshots >= ancestorIds.size()) {
idsToRetain.addAll(ancestorIds);
} else {
idsToRetain.addAll(ancestorIds.subList(0, minNumSnapshots));
// Identify refs that should be removed
Map<String, SnapshotRef> retainedRefs = computeRetainedRefs(base.refs());
Map<Long, List<String>> retainedIdToRefs = Maps.newHashMap();
for (Map.Entry<String, SnapshotRef> retainedRefEntry : retainedRefs.entrySet()) {
long snapshotId = retainedRefEntry.getValue().snapshotId();
retainedIdToRefs.putIfAbsent(snapshotId, Lists.newArrayList());
retainedIdToRefs.get(snapshotId).add(retainedRefEntry.getKey());
idsToRetain.add(snapshotId);
}

for (long idToRemove : idsToRemove) {
List<String> refsForId = retainedIdToRefs.get(idToRemove);
Preconditions.checkArgument(refsForId == null,
"Cannot expire %s. Still referenced by refs: %s", idToRemove, refsForId);
}

idsToRetain.addAll(computeAllBranchSnapshotsToRetain(retainedRefs.values()));
idsToRetain.addAll(unreferencedSnapshotsToRetain(retainedRefs.values()));

TableMetadata.Builder updatedMetaBuilder = TableMetadata.buildFrom(base);

base.refs().keySet().stream()
.filter(ref -> !retainedRefs.containsKey(ref))
.forEach(updatedMetaBuilder::removeRef);

base.snapshots().stream()
.map(Snapshot::snapshotId)
.filter(snapshot -> !idsToRetain.contains(snapshot))
.forEach(idsToRemove::add);
updatedMetaBuilder.removeSnapshots(idsToRemove);

return updatedMetaBuilder.build();
}

private Map<String, SnapshotRef> computeRetainedRefs(Map<String, SnapshotRef> refs) {
Map<String, SnapshotRef> retainedRefs = Maps.newHashMap();
for (Map.Entry<String, SnapshotRef> refEntry : refs.entrySet()) {
String name = refEntry.getKey();
SnapshotRef ref = refEntry.getValue();
if (name.equals(SnapshotRef.MAIN_BRANCH)) {
retainedRefs.put(name, ref);
continue;
}

Snapshot snapshot = base.snapshot(ref.snapshotId());
long maxRefAgeMs = ref.maxRefAgeMs() != null ? ref.maxRefAgeMs() : defaultMaxRefAgeMs;
if (snapshot != null) {
long refAgeMs = now - snapshot.timestampMillis();
if (refAgeMs <= maxRefAgeMs) {
retainedRefs.put(name, ref);
}
} else {
LOG.warn("Removing invalid ref {}: snapshot {} does not exist", name, ref.snapshotId());
}
}

TableMetadata updateMeta = base.removeSnapshotsIf(snapshot ->
idsToRemove.contains(snapshot.snapshotId()) ||
(snapshot.timestampMillis() < expireOlderThan && !idsToRetain.contains(snapshot.snapshotId())));
List<Snapshot> updateSnapshots = updateMeta.snapshots();
List<Snapshot> baseSnapshots = base.snapshots();
return updateSnapshots.size() != baseSnapshots.size() ? updateMeta : base;
return retainedRefs;
}

private Set<Long> computeAllBranchSnapshotsToRetain(Collection<SnapshotRef> refs) {
Set<Long> branchSnapshotsToRetain = Sets.newHashSet();
for (SnapshotRef ref : refs) {
if (ref.isBranch()) {
long expireSnapshotsOlderThan = ref.maxSnapshotAgeMs() != null ? now - ref.maxSnapshotAgeMs() :
defaultExpireOlderThan;
int minSnapshotsToKeep = ref.minSnapshotsToKeep() != null ? ref.minSnapshotsToKeep() :
defaultMinNumSnapshots;
branchSnapshotsToRetain.addAll(
computeBranchSnapshotsToRetain(ref.snapshotId(), expireSnapshotsOlderThan, minSnapshotsToKeep));
}
}

return branchSnapshotsToRetain;
}

private Set<Long> computeBranchSnapshotsToRetain(
long snapshot,
long expireSnapshotsOlderThan,
int minSnapshotsToKeep) {
Set<Long> idsToRetain = Sets.newHashSet();
for (Snapshot ancestor : SnapshotUtil.ancestorsOf(snapshot, base::snapshot)) {
if (idsToRetain.size() < minSnapshotsToKeep || ancestor.timestampMillis() >= expireSnapshotsOlderThan) {
idsToRetain.add(ancestor.snapshotId());
} else {
return idsToRetain;
}
}

return idsToRetain;
}

private Set<Long> unreferencedSnapshotsToRetain(Collection<SnapshotRef> refs) {
Set<Long> referencedSnapshots = Sets.newHashSet();
for (SnapshotRef ref : refs) {
for (Snapshot snapshot : SnapshotUtil.ancestorsOf(ref.snapshotId(), base::snapshot)) {
referencedSnapshots.add(snapshot.snapshotId());
}
}

Set<Long> snapshotsToRetain = Sets.newHashSet();
for (Snapshot snapshot : base.snapshots()) {
if (!referencedSnapshots.contains(snapshot.snapshotId()) && // unreferenced
snapshot.timestampMillis() >= defaultExpireOlderThan) { // not old enough to expire
snapshotsToRetain.add(snapshot.snapshotId());
}
}

return snapshotsToRetain;
}

@Override
Expand All @@ -190,6 +301,10 @@ public void commit() {
.onlyRetryOn(CommitFailedException.class)
.run(item -> {
TableMetadata updated = internalApply();
if (cleanExpiredFiles && updated.refs().size() > 1) {
throw new UnsupportedOperationException("Cannot incrementally clean files for tables with more than 1 ref");
Copy link
Contributor

@zinking zinking Oct 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amogh-jahagirdar I have a simple use case blocked by this line.

I am tagging one of the snapshots on the mainline as never expire, but as soon as I tag it, the normal expire procedure (expire with clean) stopped working, as there are two refs now.

is the constraint correct here ? because in my scenario, I am merely indicating SKIP this snapshot, with other terms stay the same.

Copy link
Contributor

@zinking zinking Oct 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, if there's only one branch main, and tags are all on main, it is safe to carry on.
is that correct ? otherwise when is the expired file supposed to be cleanedUp ? through removeOrphan?

}

ops.commit(base, updated);
});
LOG.info("Committed snapshot changes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,15 @@ static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node)

// parse properties map
Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, node);
long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
long currentSnapshotId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node);

Map<String, SnapshotRef> refs;
if (node.has(REFS)) {
refs = refsFromJson(node.get(REFS));
} else if (currentSnapshotId != -1) {
// initialize the main branch if there are no refs
refs = ImmutableMap.of(SnapshotRef.MAIN_BRANCH, SnapshotRef.branchBuilder(currentSnapshotId).build());
} else {
refs = ImmutableMap.of();
}
Expand Down Expand Up @@ -457,7 +460,7 @@ static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node)

return new TableMetadata(metadataLocation, formatVersion, uuid, location,
lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, currentSchemaId, schemas, defaultSpecId, specs,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentVersionId,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, entries.build(), metadataEntries.build(), refs,
ImmutableList.of() /* no changes from the file */);
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ private TableProperties() {
public static final String MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep";
public static final int MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1;

public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms";
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE;

public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level";
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";

Expand Down
Loading