Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
*/
public class CherrypickAncestorCommitException extends ValidationException {

public CherrypickAncestorCommitException(Long snapshotId) {
super("Cannot cherrypick snapshot %s which is already an ancestor", String.valueOf(snapshotId));
public CherrypickAncestorCommitException(long snapshotId) {
super("Cannot cherrypick snapshot %s: already an ancestor", String.valueOf(snapshotId));
}

public CherrypickAncestorCommitException(Long snapshotId, Long publishedAncestorId) {
super("Cannot cherrypick snapshot %s which was picked already to create an ancestor %s",
public CherrypickAncestorCommitException(long snapshotId, long publishedAncestorId) {
super("Cannot cherrypick snapshot %s: already picked to create ancestor %s",
String.valueOf(snapshotId), String.valueOf(publishedAncestorId));
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ public ExpireSnapshots expireSnapshots() {

@Override
public Rollback rollback() {
return new RollbackToSnapshot(ops, this);
return new RollbackToSnapshot(ops);
}

@Override
public ManageSnapshots manageSnapshots() {
return new SnapshotManager(ops, this);
return new SnapshotManager(ops);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

class RollbackToSnapshot extends SnapshotManager implements Rollback {

RollbackToSnapshot(TableOperations ops, Table table) {
super(ops, table);
RollbackToSnapshot(TableOperations ops) {
super(ops);
}

@Override
Expand Down
152 changes: 117 additions & 35 deletions core/src/main/java/org/apache/iceberg/SnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,28 @@
package org.apache.iceberg;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.WapUtil;

public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> implements ManageSnapshots {

private SnapshotManagerOperation operation;
private TableMetadata base;
private Long targetSnapshotId = null;
private Table table;

enum SnapshotManagerOperation {
private enum SnapshotManagerOperation {
CHERRYPICK,
ROLLBACK
}

SnapshotManager(TableOperations ops, Table table) {
private final TableOperations ops;
private SnapshotManagerOperation managerOperation = null;
private Long targetSnapshotId = null;
private String snapshotOperation = null;

SnapshotManager(TableOperations ops) {
super(ops);
this.base = ops.current();
this.table = table;
this.ops = ops;
}

@Override
Expand All @@ -49,89 +51,169 @@ protected ManageSnapshots self() {

@Override
protected String operation() {
Snapshot manageSnapshot = base.snapshot(targetSnapshotId);
return manageSnapshot.operation();
// snapshotOperation is used by SnapshotProducer when building and writing a new snapshot for cherrypick
Preconditions.checkNotNull(snapshotOperation, "[BUG] Detected uninitialized operation");
return snapshotOperation;
}

@Override
public ManageSnapshots cherrypick(long snapshotId) {
Preconditions.checkArgument(base.snapshot(snapshotId) != null,
TableMetadata base = ops.current();
ValidationException.check(base.snapshot(snapshotId) != null,
"Cannot cherry pick unknown snapshot id: %s", snapshotId);

operation = SnapshotManagerOperation.CHERRYPICK;
this.targetSnapshotId = snapshotId;

// Pick modifications from the snapshot
Snapshot cherryPickSnapshot = base.snapshot(this.targetSnapshotId);
Snapshot cherryPickSnapshot = base.snapshot(snapshotId);
// only append operations are currently supported
if (!cherryPickSnapshot.operation().equals(DataOperations.APPEND)) {
throw new UnsupportedOperationException("Can cherry pick only append operations");
}

this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
this.targetSnapshotId = snapshotId;
this.snapshotOperation = cherryPickSnapshot.operation();

// Pick modifications from the snapshot
for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
add(addedFile);
}

// this property is set on target snapshot that will get published
String wapId = WapUtil.stagedWapId(cherryPickSnapshot);
boolean isWapWorkflow = wapId != null && !wapId.isEmpty();
if (isWapWorkflow) {
String wapId = WapUtil.validateWapPublish(base, targetSnapshotId);
if (wapId != null) {
set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId);
}

// link the snapshot about to be published on commit with the picked snapshot
set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(this.targetSnapshotId));
set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId));

return this;
}

@Override
public ManageSnapshots setCurrentSnapshot(long snapshotId) {
Preconditions.checkArgument(base.snapshot(snapshotId) != null,
ValidationException.check(ops.current().snapshot(snapshotId) != null,
"Cannot roll back to unknown snapshot id: %s", snapshotId);

operation = SnapshotManagerOperation.ROLLBACK;
this.managerOperation = SnapshotManagerOperation.ROLLBACK;
this.targetSnapshotId = snapshotId;

return this;
}

@Override
public ManageSnapshots rollbackToTime(long timestampMillis) {
operation = SnapshotManagerOperation.ROLLBACK;
// find the latest snapshot by timestamp older than timestampMillis
Snapshot snapshot = SnapshotUtil.findLatestSnapshotOlderThan(table, timestampMillis);
Snapshot snapshot = findLatestAncestorOlderThan(ops.current(), timestampMillis);
Preconditions.checkArgument(snapshot != null,
"Cannot roll back, no valid snapshot older than: %s", timestampMillis);

this.managerOperation = SnapshotManagerOperation.ROLLBACK;
this.targetSnapshotId = snapshot.snapshotId();

return this;
}

@Override
public ManageSnapshots rollbackTo(long snapshotId) {
Preconditions.checkArgument(base.snapshot(snapshotId) != null,
TableMetadata current = ops.current();
ValidationException.check(current.snapshot(snapshotId) != null,
"Cannot roll back to unknown snapshot id: %s", snapshotId);

ValidationException.check(SnapshotUtil.isCurrentAncestor(table, snapshotId),
ValidationException.check(
!SnapshotUtil.ancestorIds(current.currentSnapshot(), current::snapshot).contains(snapshotId),
"Cannot roll back to snapshot, not an ancestor of the current state: %s", snapshotId);
return setCurrentSnapshot(snapshotId);
}

private void validate(TableMetadata base) {
validateNonAncestor(base, targetSnapshotId);
WapUtil.validateWapPublish(base, targetSnapshotId);
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
// this apply method is called by SnapshotProducer, which refreshes the current table state
// because the state may have changed in that refresh, the validations must be done here
validate(base);
return super.apply(base);
}

@Override
public Snapshot apply() {
TableMetadata base = ops.refresh();

if (targetSnapshotId == null) {
// if no target snapshot was configured then NOOP by returning current state
return table.currentSnapshot();
return base.currentSnapshot();
}
switch (operation) {

switch (managerOperation) {
case CHERRYPICK:
WapUtil.validateNonAncestor(this.table, this.targetSnapshotId);
WapUtil.validateWapPublish(this.table, this.targetSnapshotId);
return super.apply();
if (base.currentSnapshot().snapshotId() == base.snapshot(targetSnapshotId).parentId()) {
// the snapshot to cherrypick is already based on the current state: fast-forward
validate(base);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many times do you reckon validate(base) would be called per snapshot commit? I'm asking coz it's an expensive operations, validate is θ(2N) where N is # ancestors, which can grow monotonically.

Copy link
Collaborator

@prodeezy prodeezy Jan 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR only calls validate(base) once for each commit. So your PR doesn't add any further complexity to what we already had earlier in the original changes. Although, i'm just pondering on how this can impact commit performance more generally. (this doesn't have to be addressed in this pr)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to be concerned about performance here. Tables usually have under a few thousand snapshots for us, and we have very frequent commits and a 7 day retention window. Even if we process a few thousand snapshots a couple times, it's not going to be much of a delay because these are all in memory.

Also, we have to process the snapshots for each new version because the set of valid snapshots may have changed. Every time the commit fails, this will retry and validate the latest table state, which is unavoidable. In the best case, that will happen just once, but it could be up to the number of retries configured for the table.

return base.snapshot(targetSnapshotId);
} else {
// validate(TableMetadata) is called in apply(TableMetadata) after this apply refreshes the table state
return super.apply();
}

case ROLLBACK:
return base.snapshot(targetSnapshotId);

default:
throw new ValidationException("Invalid SnapshotManagerOperation, " +
"only cherrypick, rollback are supported");
throw new ValidationException("Invalid SnapshotManagerOperation: only cherrypick, rollback are supported");
}
}

private static void validateNonAncestor(TableMetadata meta, long snapshotId) {
if (isCurrentAncestor(meta, snapshotId)) {
throw new CherrypickAncestorCommitException(snapshotId);
}

Long ancestorId = lookupAncestorBySourceSnapshot(meta, snapshotId);
if (ancestorId != null) {
throw new CherrypickAncestorCommitException(snapshotId, ancestorId);
}
}

private static Long lookupAncestorBySourceSnapshot(TableMetadata meta, long snapshotId) {
String snapshotIdStr = String.valueOf(snapshotId);
for (long ancestorId : currentAncestors(meta)) {
Map<String, String> summary = meta.snapshot(ancestorId).summary();
if (summary != null && snapshotIdStr.equals(summary.get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP))) {
return ancestorId;
}
}

return null;
}

/**
* Return the latest snapshot whose timestamp is before the provided timestamp.
*
* @param meta {@link TableMetadata} for a table
* @param timestampMillis lookup snapshots before this timestamp
* @return the ID of the snapshot that was current at the given timestamp, or null
*/
private static Snapshot findLatestAncestorOlderThan(TableMetadata meta, long timestampMillis) {
long snapshotTimestamp = 0;
Snapshot result = null;
for (Long snapshotId : currentAncestors(meta)) {
Snapshot snapshot = meta.snapshot(snapshotId);
if (snapshot.timestampMillis() < timestampMillis &&
snapshot.timestampMillis() > snapshotTimestamp) {
result = snapshot;
snapshotTimestamp = snapshot.timestampMillis();
}
}
return result;
}

private static List<Long> currentAncestors(TableMetadata meta) {
return SnapshotUtil.ancestorIds(meta.currentSnapshot(), meta::snapshot);
}

private static boolean isCurrentAncestor(TableMetadata meta, long snapshotId) {
return currentAncestors(meta).contains(snapshotId);
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ public void commit() {
updated = base.replaceCurrentSnapshot(newSnapshot);
}

if (updated == base) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// do not commit if the metadata has not changed. for example, this may happen when setting the current
// snapshot to an ID that is already current. note that this check uses identity.
return;
}

// if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
// to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
taskOps.commit(base, updated.withUUID());
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SnapshotSummary {
public static final String TOTAL_RECORDS_PROP = "total-records";
public static final String DELETED_DUPLICATE_FILES = "deleted-duplicate-files";
public static final String CHANGED_PARTITION_COUNT_PROP = "changed-partition-count";
public static final String STAGED_WAP_ID_PROP = "wap.id";
public static final String PUBLISHED_WAP_ID_PROP = "published-wap-id";
public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";

Expand Down
32 changes: 17 additions & 15 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,22 +394,19 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
// there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
if (snapshotsById.containsKey(snapshot.snapshotId())) {
return setCurrentSnapshotTo(snapshot);
} else {

List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
.addAll(snapshots)
.add(snapshot)
.build();
List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
.addAll(snapshotLog)
.add(new SnapshotLogEntry(snapshot.timestampMillis(),
snapshot.snapshotId()))
.build();

return new TableMetadata(null, uuid, location,
snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
.addAll(snapshots)
.add(snapshot)
.build();
List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
.addAll(snapshotLog)
.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
.build();
return new TableMetadata(null, uuid, location,
snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
Expand Down Expand Up @@ -448,6 +445,11 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()),
"Cannot set current snapshot to unknown: %s", snapshot.snapshotId());

if (currentSnapshotId == snapshot.snapshotId()) {
// change is a noop
return this;
}

long nowMillis = System.currentTimeMillis();
List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
.addAll(snapshotLog)
Expand Down
25 changes: 0 additions & 25 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,4 @@ public static List<Long> ancestorIds(Snapshot snapshot, Function<Long, Snapshot>
}
return ancestorIds;
}

public static boolean isCurrentAncestor(Table table, long snapshotId) {
List<Long> currentAncestors = SnapshotUtil.ancestorIds(table.currentSnapshot(), table::snapshot);
return currentAncestors.contains(snapshotId);
}

/**
* Return the latest snapshot whose timestamp is before the provided timestamp.
* @param table Table representing the table state on which the snapshot is being looked up
* @param timestampMillis lookup snapshots before this timestamp
* @return
*/
public static Snapshot findLatestSnapshotOlderThan(Table table, long timestampMillis) {
long snapshotTimestamp = 0;
Snapshot result = null;
for (Long snapshotId : currentAncestors(table)) {
Snapshot snapshot = table.snapshot(snapshotId);
if (snapshot.timestampMillis() < timestampMillis &&
snapshot.timestampMillis() > snapshotTimestamp) {
result = snapshot;
snapshotTimestamp = snapshot.timestampMillis();
}
}
return result;
}
}
Loading