Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public SnapshotRefType type() {
return type;
}

public boolean isBranch() {
return type == SnapshotRefType.BRANCH;
}

public boolean isTag() {
return type == SnapshotRefType.TAG;
}

public Integer minSnapshotsToKeep() {
return minSnapshotsToKeep;
}
Expand Down
26 changes: 22 additions & 4 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,32 @@ public long snapshotId() {
}
}

class SetCurrentSnapshot implements MetadataUpdate {
private final Long snapshotId;
class RemoveSnapshotRef implements MetadataUpdate {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoveSnapshotRef is used for dropping a branch instead of SetCurrentSnapshot(null). Now the snapshot can be required instead of nullable.

private final String name;

public SetCurrentSnapshot(Long snapshotId) {
public RemoveSnapshotRef(String name) {
this.name = name;
}

public String name() {
return name;
}
}

class SetSnapshotRef implements MetadataUpdate {
private final String name;
private final long snapshotId;

public SetSnapshotRef(String name, long snapshotId) {
this.name = name;
this.snapshotId = snapshotId;
}

public Long snapshotId() {
public String name() {
return name;
}

public long snapshotId() {
return snapshotId;
}
}
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,18 @@ public void commit() {
.run(taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata updated;
if (stageOnly) {
updated = base.addStagedSnapshot(newSnapshot);
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wypoon and @sririshindra, I think this fixes the problem from #4088. The previous call to replaceCurrentSnapshot always used setCurrentSnapshot(Snapshot, String) rather than setCurrentSnapshot(long, String). This detects when the snapshot already exists and calls the correct method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense.
I cherry-picked some prerequisite commits, and then applied this change to our 0.13.0 branch, and reran @sririshindra's rollback test, and it passed.

} else if (stageOnly) {
update.addSnapshot(newSnapshot);
} else {
updated = base.replaceCurrentSnapshot(newSnapshot);
update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH);
}

if (updated == base) {
TableMetadata updated = update.build();
if (updated.changes().isEmpty()) {
// do not commit if the metadata has not changed. for example, this may happen when setting the current
// snapshot to an ID that is already current. note that this check uses identity.
return;
Expand Down
94 changes: 60 additions & 34 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,6 @@ public TableMetadata replaceSortOrder(SortOrder newOrder) {
return new Builder(this).setDefaultSortOrder(newOrder).build();
}

public TableMetadata addStagedSnapshot(Snapshot snapshot) {
return new Builder(this).addSnapshot(snapshot).build();
}

public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
return new Builder(this).setCurrentSnapshot(snapshot).build();
}

public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
List<Snapshot> toRemove = snapshots.stream().filter(removeIf).collect(Collectors.toList());
return new Builder(this).removeSnapshots(toRemove).build();
Expand Down Expand Up @@ -610,7 +602,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update

return new Builder(this)
.upgradeFormatVersion(newFormatVersion)
.setCurrentSnapshot(null)
.removeBranch(SnapshotRef.MAIN_BRANCH)
.setCurrentSchema(freshSchema, newLastColumnId.get())
.setDefaultPartitionSpec(freshSpec)
.setDefaultSortOrder(freshSortOrder)
Expand Down Expand Up @@ -936,23 +928,38 @@ public Builder addSnapshot(Snapshot snapshot) {
return this;
}

public Builder setCurrentSnapshot(Snapshot snapshot) {
public Builder setBranchSnapshot(Snapshot snapshot, String branch) {
addSnapshot(snapshot);
setCurrentSnapshot(snapshot, null);
setBranchSnapshot(snapshot, branch, null);
return this;
}

public Builder setCurrentSnapshot(long snapshotId) {
if (currentSnapshotId == snapshotId) {
public Builder setBranchSnapshot(long snapshotId, String branch) {
SnapshotRef ref = refs.get(branch);
if (ref != null && ref.snapshotId() == snapshotId) {
// change is a noop
return this;
}

Snapshot snapshot = snapshotsById.get(snapshotId);
ValidationException.check(snapshot != null,
"Cannot set current snapshot to unknown: %s", snapshotId);
ValidationException.check(snapshot != null, "Cannot set %s to unknown snapshot: %s", branch, snapshotId);

setBranchSnapshot(snapshot, branch, System.currentTimeMillis());

return this;
}

setCurrentSnapshot(snapshot, System.currentTimeMillis());
public Builder removeBranch(String branch) {
if (SnapshotRef.MAIN_BRANCH.equals(branch)) {
this.currentSnapshotId = -1;
snapshotLog.clear();
}

SnapshotRef ref = refs.remove(branch);
if (ref != null) {
ValidationException.check(ref.isBranch(), "Cannot remove branch: %s is a tag", branch);
changes.add(new MetadataUpdate.RemoveSnapshotRef(branch));
}

return this;
}
Expand All @@ -972,10 +979,17 @@ public Builder removeSnapshots(List<Snapshot> snapshotsToRemove) {
}

this.snapshots = retainedSnapshots;
if (!snapshotsById.containsKey(currentSnapshotId)) {
setCurrentSnapshot(null, System.currentTimeMillis());

// remove any refs that are no longer valid
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this fail if there are still refs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the current usage, we are only calling this when expiring snapshots. At that time the snapshots send in here should already be the ones that have to be removed after evaluating all retention policies. I think that's why we directly remove the refs instead of failing.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is in the scope for removeSnapshots(snapshots) to remove the references itself for any snapshots we are removing. Otherwise we would have to have the caller do a separate removeReferences and then do a removeSnapshots which seems more cumbersome.

Set<String> danglingRefs = Sets.newHashSet();
for (Map.Entry<String, SnapshotRef> refEntry : refs.entrySet()) {
if (!snapshotsById.containsKey(refEntry.getValue().snapshotId())) {
danglingRefs.add(refEntry.getKey());
}
}

danglingRefs.forEach(this::removeBranch);

return this;
}

Expand Down Expand Up @@ -1180,26 +1194,36 @@ private int reuseOrCreateNewSortOrderId(SortOrder newOrder) {
return newOrderId;
}

private void setCurrentSnapshot(Snapshot snapshot, Long currentTimestampMillis) {
if (snapshot == null) {
this.currentSnapshotId = -1;
snapshotLog.clear();
changes.add(new MetadataUpdate.SetCurrentSnapshot(null));
return;
}

if (currentSnapshotId == snapshot.snapshotId()) {
return;
private void setBranchSnapshot(Snapshot snapshot, String branch, Long currentTimestampMillis) {
long replacementSnapshotId = snapshot.snapshotId();
SnapshotRef ref = refs.get(branch);
if (ref != null) {
ValidationException.check(ref.isBranch(), "Cannot update branch: %s is a tag", branch);
if (ref.snapshotId() == replacementSnapshotId) {
return;
}
}

ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber,
"Last sequence number %s is less than existing snapshot sequence number %s",
lastSequenceNumber, snapshot.sequenceNumber());

this.lastUpdatedMillis = currentTimestampMillis != null ? currentTimestampMillis : snapshot.timestampMillis();
this.currentSnapshotId = snapshot.snapshotId();
snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, snapshot.snapshotId()));
changes.add(new MetadataUpdate.SetCurrentSnapshot(snapshot.snapshotId()));

if (SnapshotRef.MAIN_BRANCH.equals(branch)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like the check for main branch is done in many places, should we also have a method for ref.isMainBranch()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that there aren't many places where we actually have a SnapshotRef instance. Here, for example, we don't necessarily have one because ref might be null. So we actually want to use the string branch name.

Below, we have a SetSnapshotRef instance, which is a change. And in places like removeBranch, we do the check before loading the ref. If there is no main ref to remove, we should still ensure that we set current-snapshot-id appropriately.

this.currentSnapshotId = replacementSnapshotId;
snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, replacementSnapshotId));
}

SnapshotRef newRef;
if (ref != null) {
newRef = SnapshotRef.builderFrom(ref, replacementSnapshotId).build();
} else {
newRef = SnapshotRef.branchBuilder(replacementSnapshotId).build();
}

refs.put(branch, newRef);
changes.add(new MetadataUpdate.SetSnapshotRef(branch, replacementSnapshotId));
}

private static List<MetadataLogEntry> addPreviousFile(
Expand Down Expand Up @@ -1237,9 +1261,11 @@ private static Set<Long> intermediateSnapshotIdSet(List<MetadataUpdate> changes,
// adds must always come before set current snapshot
MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) update;
addedSnapshotIds.add(addSnapshot.snapshot().snapshotId());
} else if (update instanceof MetadataUpdate.SetCurrentSnapshot) {
Long snapshotId = ((MetadataUpdate.SetCurrentSnapshot) update).snapshotId();
if (snapshotId != null && addedSnapshotIds.contains(snapshotId) && snapshotId != currentSnapshotId) {
} else if (update instanceof MetadataUpdate.SetSnapshotRef) {
MetadataUpdate.SetSnapshotRef setRef = (MetadataUpdate.SetSnapshotRef) update;
long snapshotId = setRef.snapshotId();
if (addedSnapshotIds.contains(snapshotId) &&
SnapshotRef.MAIN_BRANCH.equals(setRef.name()) && snapshotId != currentSnapshotId) {
intermediateSnapshotIds.add(snapshotId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Predicate;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand Down Expand Up @@ -85,13 +86,15 @@ protected void refreshFromMetadataLocation(String newLocation, Predicate<Excepti

private TableMetadata loadTableMetadata(String metadataLocation) {
// Update the TableMetadata with the Content of NessieTableState.
return TableMetadata.buildFrom(TableMetadataParser.read(io(), metadataLocation))
.setCurrentSnapshot(table.getSnapshotId())
TableMetadata.Builder builder = TableMetadata.buildFrom(TableMetadataParser.read(io(), metadataLocation))
.setCurrentSchema(table.getSchemaId())
.setDefaultSortOrder(table.getSortOrderId())
.setDefaultPartitionSpec(table.getSpecId())
.discardChanges()
.build();
.setDefaultPartitionSpec(table.getSpecId());
if (table.getSnapshotId() != -1) {
builder.setBranchSnapshot(table.getSnapshotId(), SnapshotRef.MAIN_BRANCH);
}

return builder.discardChanges().build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ public void testHistoryTable() {
TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(0), actual.get(0));
TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(1), actual.get(1));
TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(2), actual.get(2));
TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(3), actual.get(3));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was failing, I think due to the problem in #4088. While looking at it, I realized that the last record was not being checked as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong about the cause of the history test failure. It turns out that I called the wrong method from setBranchSnapshot(Snapshot, String).

}

@Test
Expand Down