diff --git a/api/src/main/java/org/apache/iceberg/SnapshotRef.java b/api/src/main/java/org/apache/iceberg/SnapshotRef.java index 3ff30bf21029..a7a9abc33d68 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotRef.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotRef.java @@ -55,6 +55,14 @@ public SnapshotRefType type() { return type; } + public boolean isBranch() { + return type == SnapshotRefType.BRANCH; + } + + public boolean isTag() { + return type == SnapshotRefType.TAG; + } + public Integer minSnapshotsToKeep() { return minSnapshotsToKeep; } diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 25d015a48a0d..7650190448c8 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -153,14 +153,32 @@ public long snapshotId() { } } - class SetCurrentSnapshot implements MetadataUpdate { - private final Long snapshotId; + class RemoveSnapshotRef implements MetadataUpdate { + private final String name; - public SetCurrentSnapshot(Long snapshotId) { + public RemoveSnapshotRef(String name) { + this.name = name; + } + + public String name() { + return name; + } + } + + class SetSnapshotRef implements MetadataUpdate { + private final String name; + private final long snapshotId; + + public SetSnapshotRef(String name, long snapshotId) { + this.name = name; this.snapshotId = snapshotId; } - public Long snapshotId() { + public String name() { + return name; + } + + public long snapshotId() { return snapshotId; } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 7825500bc7cb..96fd0c6eecca 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -282,14 +282,18 @@ public void commit() { .run(taskOps -> { Snapshot newSnapshot = apply(); newSnapshotId.set(newSnapshot.snapshotId()); - TableMetadata updated; - if (stageOnly) { - updated = base.addStagedSnapshot(newSnapshot); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + if (base.snapshot(newSnapshot.snapshotId()) != null) { + // this is a rollback operation + update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH); + } else if (stageOnly) { + update.addSnapshot(newSnapshot); } else { - updated = base.replaceCurrentSnapshot(newSnapshot); + update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH); } - if (updated == base) { + TableMetadata updated = update.build(); + if (updated.changes().isEmpty()) { // do not commit if the metadata has not changed. for example, this may happen when setting the current // snapshot to an ID that is already current. note that this check uses identity. return; diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 92aba60c529a..67f28d3bbbed 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -496,14 +496,6 @@ public TableMetadata replaceSortOrder(SortOrder newOrder) { return new Builder(this).setDefaultSortOrder(newOrder).build(); } - public TableMetadata addStagedSnapshot(Snapshot snapshot) { - return new Builder(this).addSnapshot(snapshot).build(); - } - - public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { - return new Builder(this).setCurrentSnapshot(snapshot).build(); - } - public TableMetadata removeSnapshotsIf(Predicate removeIf) { List toRemove = snapshots.stream().filter(removeIf).collect(Collectors.toList()); return new Builder(this).removeSnapshots(toRemove).build(); @@ -610,7 +602,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update return new Builder(this) .upgradeFormatVersion(newFormatVersion) - .setCurrentSnapshot(null) + .removeBranch(SnapshotRef.MAIN_BRANCH) .setCurrentSchema(freshSchema, newLastColumnId.get()) .setDefaultPartitionSpec(freshSpec) .setDefaultSortOrder(freshSortOrder) @@ -936,23 +928,38 @@ public Builder addSnapshot(Snapshot snapshot) { return this; } - public Builder setCurrentSnapshot(Snapshot snapshot) { + public Builder setBranchSnapshot(Snapshot snapshot, String branch) { addSnapshot(snapshot); - setCurrentSnapshot(snapshot, null); + setBranchSnapshot(snapshot, branch, null); return this; } - public Builder setCurrentSnapshot(long snapshotId) { - if (currentSnapshotId == snapshotId) { + public Builder setBranchSnapshot(long snapshotId, String branch) { + SnapshotRef ref = refs.get(branch); + if (ref != null && ref.snapshotId() == snapshotId) { // change is a noop return this; } Snapshot snapshot = snapshotsById.get(snapshotId); - ValidationException.check(snapshot != null, - "Cannot set current snapshot to unknown: %s", snapshotId); + ValidationException.check(snapshot != null, "Cannot set %s to unknown snapshot: %s", branch, snapshotId); + + setBranchSnapshot(snapshot, branch, System.currentTimeMillis()); + + return this; + } - setCurrentSnapshot(snapshot, System.currentTimeMillis()); + public Builder removeBranch(String branch) { + if (SnapshotRef.MAIN_BRANCH.equals(branch)) { + this.currentSnapshotId = -1; + snapshotLog.clear(); + } + + SnapshotRef ref = refs.remove(branch); + if (ref != null) { + ValidationException.check(ref.isBranch(), "Cannot remove branch: %s is a tag", branch); + changes.add(new MetadataUpdate.RemoveSnapshotRef(branch)); + } return this; } @@ -972,10 +979,17 @@ public Builder removeSnapshots(List snapshotsToRemove) { } this.snapshots = retainedSnapshots; - if (!snapshotsById.containsKey(currentSnapshotId)) { - setCurrentSnapshot(null, System.currentTimeMillis()); + + // remove any refs that are no longer valid + Set danglingRefs = Sets.newHashSet(); + for (Map.Entry refEntry : refs.entrySet()) { + if (!snapshotsById.containsKey(refEntry.getValue().snapshotId())) { + danglingRefs.add(refEntry.getKey()); + } } + danglingRefs.forEach(this::removeBranch); + return this; } @@ -1180,16 +1194,14 @@ private int reuseOrCreateNewSortOrderId(SortOrder newOrder) { return newOrderId; } - private void setCurrentSnapshot(Snapshot snapshot, Long currentTimestampMillis) { - if (snapshot == null) { - this.currentSnapshotId = -1; - snapshotLog.clear(); - changes.add(new MetadataUpdate.SetCurrentSnapshot(null)); - return; - } - - if (currentSnapshotId == snapshot.snapshotId()) { - return; + private void setBranchSnapshot(Snapshot snapshot, String branch, Long currentTimestampMillis) { + long replacementSnapshotId = snapshot.snapshotId(); + SnapshotRef ref = refs.get(branch); + if (ref != null) { + ValidationException.check(ref.isBranch(), "Cannot update branch: %s is a tag", branch); + if (ref.snapshotId() == replacementSnapshotId) { + return; + } } ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber, @@ -1197,9 +1209,21 @@ private void setCurrentSnapshot(Snapshot snapshot, Long currentTimestampMillis) lastSequenceNumber, snapshot.sequenceNumber()); this.lastUpdatedMillis = currentTimestampMillis != null ? currentTimestampMillis : snapshot.timestampMillis(); - this.currentSnapshotId = snapshot.snapshotId(); - snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, snapshot.snapshotId())); - changes.add(new MetadataUpdate.SetCurrentSnapshot(snapshot.snapshotId())); + + if (SnapshotRef.MAIN_BRANCH.equals(branch)) { + this.currentSnapshotId = replacementSnapshotId; + snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, replacementSnapshotId)); + } + + SnapshotRef newRef; + if (ref != null) { + newRef = SnapshotRef.builderFrom(ref, replacementSnapshotId).build(); + } else { + newRef = SnapshotRef.branchBuilder(replacementSnapshotId).build(); + } + + refs.put(branch, newRef); + changes.add(new MetadataUpdate.SetSnapshotRef(branch, replacementSnapshotId)); } private static List addPreviousFile( @@ -1237,9 +1261,11 @@ private static Set intermediateSnapshotIdSet(List changes, // adds must always come before set current snapshot MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) update; addedSnapshotIds.add(addSnapshot.snapshot().snapshotId()); - } else if (update instanceof MetadataUpdate.SetCurrentSnapshot) { - Long snapshotId = ((MetadataUpdate.SetCurrentSnapshot) update).snapshotId(); - if (snapshotId != null && addedSnapshotIds.contains(snapshotId) && snapshotId != currentSnapshotId) { + } else if (update instanceof MetadataUpdate.SetSnapshotRef) { + MetadataUpdate.SetSnapshotRef setRef = (MetadataUpdate.SetSnapshotRef) update; + long snapshotId = setRef.snapshotId(); + if (addedSnapshotIds.contains(snapshotId) && + SnapshotRef.MAIN_BRANCH.equals(setRef.name()) && snapshotId != currentSnapshotId) { intermediateSnapshotIds.add(snapshotId); } } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java index 6b406626c88c..d1069905e23d 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -23,6 +23,7 @@ import java.util.function.Predicate; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.exceptions.CommitFailedException; @@ -85,13 +86,15 @@ protected void refreshFromMetadataLocation(String newLocation, Predicate