diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CherrypickAncestorCommitException.java b/api/src/main/java/org/apache/iceberg/exceptions/CherrypickAncestorCommitException.java index 918cfe459edf..11b875aaf876 100644 --- a/api/src/main/java/org/apache/iceberg/exceptions/CherrypickAncestorCommitException.java +++ b/api/src/main/java/org/apache/iceberg/exceptions/CherrypickAncestorCommitException.java @@ -25,12 +25,12 @@ */ public class CherrypickAncestorCommitException extends ValidationException { - public CherrypickAncestorCommitException(Long snapshotId) { - super("Cannot cherrypick snapshot %s which is already an ancestor", String.valueOf(snapshotId)); + public CherrypickAncestorCommitException(long snapshotId) { + super("Cannot cherrypick snapshot %s: already an ancestor", String.valueOf(snapshotId)); } - public CherrypickAncestorCommitException(Long snapshotId, Long publishedAncestorId) { - super("Cannot cherrypick snapshot %s which was picked already to create an ancestor %s", + public CherrypickAncestorCommitException(long snapshotId, long publishedAncestorId) { + super("Cannot cherrypick snapshot %s: already picked to create ancestor %s", String.valueOf(snapshotId), String.valueOf(publishedAncestorId)); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 3aa6edd1f8e7..84a5de642205 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -156,12 +156,12 @@ public ExpireSnapshots expireSnapshots() { @Override public Rollback rollback() { - return new RollbackToSnapshot(ops, this); + return new RollbackToSnapshot(ops); } @Override public ManageSnapshots manageSnapshots() { - return new SnapshotManager(ops, this); + return new SnapshotManager(ops); } @Override diff --git a/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java b/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java index 76da6667770c..9d6c8111a0c5 100644 --- a/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java @@ -21,8 +21,8 @@ class RollbackToSnapshot extends SnapshotManager implements Rollback { - RollbackToSnapshot(TableOperations ops, Table table) { - super(ops, table); + RollbackToSnapshot(TableOperations ops) { + super(ops); } @Override diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java index fdea118b14e8..06e3a97742c0 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java @@ -20,26 +20,28 @@ package org.apache.iceberg; import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.exceptions.CherrypickAncestorCommitException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.WapUtil; public class SnapshotManager extends MergingSnapshotProducer implements ManageSnapshots { - private SnapshotManagerOperation operation; - private TableMetadata base; - private Long targetSnapshotId = null; - private Table table; - - enum SnapshotManagerOperation { + private enum SnapshotManagerOperation { CHERRYPICK, ROLLBACK } - SnapshotManager(TableOperations ops, Table table) { + private final TableOperations ops; + private SnapshotManagerOperation managerOperation = null; + private Long targetSnapshotId = null; + private String snapshotOperation = null; + + SnapshotManager(TableOperations ops) { super(ops); - this.base = ops.current(); - this.table = table; + this.ops = ops; } @Override @@ -49,46 +51,50 @@ protected ManageSnapshots self() { @Override protected String operation() { - Snapshot manageSnapshot = base.snapshot(targetSnapshotId); - return manageSnapshot.operation(); + // snapshotOperation is used by SnapshotProducer when building and writing a new snapshot for cherrypick + Preconditions.checkNotNull(snapshotOperation, "[BUG] Detected uninitialized operation"); + return snapshotOperation; } @Override public ManageSnapshots cherrypick(long snapshotId) { - Preconditions.checkArgument(base.snapshot(snapshotId) != null, + TableMetadata base = ops.current(); + ValidationException.check(base.snapshot(snapshotId) != null, "Cannot cherry pick unknown snapshot id: %s", snapshotId); - operation = SnapshotManagerOperation.CHERRYPICK; - this.targetSnapshotId = snapshotId; - - // Pick modifications from the snapshot - Snapshot cherryPickSnapshot = base.snapshot(this.targetSnapshotId); + Snapshot cherryPickSnapshot = base.snapshot(snapshotId); // only append operations are currently supported if (!cherryPickSnapshot.operation().equals(DataOperations.APPEND)) { throw new UnsupportedOperationException("Can cherry pick only append operations"); } + this.managerOperation = SnapshotManagerOperation.CHERRYPICK; + this.targetSnapshotId = snapshotId; + this.snapshotOperation = cherryPickSnapshot.operation(); + + // Pick modifications from the snapshot for (DataFile addedFile : cherryPickSnapshot.addedFiles()) { add(addedFile); } + // this property is set on target snapshot that will get published - String wapId = WapUtil.stagedWapId(cherryPickSnapshot); - boolean isWapWorkflow = wapId != null && !wapId.isEmpty(); - if (isWapWorkflow) { + String wapId = WapUtil.validateWapPublish(base, targetSnapshotId); + if (wapId != null) { set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId); } + // link the snapshot about to be published on commit with the picked snapshot - set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(this.targetSnapshotId)); + set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId)); return this; } @Override public ManageSnapshots setCurrentSnapshot(long snapshotId) { - Preconditions.checkArgument(base.snapshot(snapshotId) != null, + ValidationException.check(ops.current().snapshot(snapshotId) != null, "Cannot roll back to unknown snapshot id: %s", snapshotId); - operation = SnapshotManagerOperation.ROLLBACK; + this.managerOperation = SnapshotManagerOperation.ROLLBACK; this.targetSnapshotId = snapshotId; return this; @@ -96,11 +102,12 @@ public ManageSnapshots setCurrentSnapshot(long snapshotId) { @Override public ManageSnapshots rollbackToTime(long timestampMillis) { - operation = SnapshotManagerOperation.ROLLBACK; // find the latest snapshot by timestamp older than timestampMillis - Snapshot snapshot = SnapshotUtil.findLatestSnapshotOlderThan(table, timestampMillis); + Snapshot snapshot = findLatestAncestorOlderThan(ops.current(), timestampMillis); Preconditions.checkArgument(snapshot != null, "Cannot roll back, no valid snapshot older than: %s", timestampMillis); + + this.managerOperation = SnapshotManagerOperation.ROLLBACK; this.targetSnapshotId = snapshot.snapshotId(); return this; @@ -108,30 +115,105 @@ public ManageSnapshots rollbackToTime(long timestampMillis) { @Override public ManageSnapshots rollbackTo(long snapshotId) { - Preconditions.checkArgument(base.snapshot(snapshotId) != null, + TableMetadata current = ops.current(); + ValidationException.check(current.snapshot(snapshotId) != null, "Cannot roll back to unknown snapshot id: %s", snapshotId); - - ValidationException.check(SnapshotUtil.isCurrentAncestor(table, snapshotId), + ValidationException.check( + !SnapshotUtil.ancestorIds(current.currentSnapshot(), current::snapshot).contains(snapshotId), "Cannot roll back to snapshot, not an ancestor of the current state: %s", snapshotId); return setCurrentSnapshot(snapshotId); } + private void validate(TableMetadata base) { + validateNonAncestor(base, targetSnapshotId); + WapUtil.validateWapPublish(base, targetSnapshotId); + } + + @Override + public List apply(TableMetadata base) { + // this apply method is called by SnapshotProducer, which refreshes the current table state + // because the state may have changed in that refresh, the validations must be done here + validate(base); + return super.apply(base); + } + @Override public Snapshot apply() { + TableMetadata base = ops.refresh(); + if (targetSnapshotId == null) { // if no target snapshot was configured then NOOP by returning current state - return table.currentSnapshot(); + return base.currentSnapshot(); } - switch (operation) { + + switch (managerOperation) { case CHERRYPICK: - WapUtil.validateNonAncestor(this.table, this.targetSnapshotId); - WapUtil.validateWapPublish(this.table, this.targetSnapshotId); - return super.apply(); + if (base.currentSnapshot().snapshotId() == base.snapshot(targetSnapshotId).parentId()) { + // the snapshot to cherrypick is already based on the current state: fast-forward + validate(base); + return base.snapshot(targetSnapshotId); + } else { + // validate(TableMetadata) is called in apply(TableMetadata) after this apply refreshes the table state + return super.apply(); + } + case ROLLBACK: return base.snapshot(targetSnapshotId); + default: - throw new ValidationException("Invalid SnapshotManagerOperation, " + - "only cherrypick, rollback are supported"); + throw new ValidationException("Invalid SnapshotManagerOperation: only cherrypick, rollback are supported"); + } + } + + private static void validateNonAncestor(TableMetadata meta, long snapshotId) { + if (isCurrentAncestor(meta, snapshotId)) { + throw new CherrypickAncestorCommitException(snapshotId); + } + + Long ancestorId = lookupAncestorBySourceSnapshot(meta, snapshotId); + if (ancestorId != null) { + throw new CherrypickAncestorCommitException(snapshotId, ancestorId); } } + + private static Long lookupAncestorBySourceSnapshot(TableMetadata meta, long snapshotId) { + String snapshotIdStr = String.valueOf(snapshotId); + for (long ancestorId : currentAncestors(meta)) { + Map summary = meta.snapshot(ancestorId).summary(); + if (summary != null && snapshotIdStr.equals(summary.get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP))) { + return ancestorId; + } + } + + return null; + } + + /** + * Return the latest snapshot whose timestamp is before the provided timestamp. + * + * @param meta {@link TableMetadata} for a table + * @param timestampMillis lookup snapshots before this timestamp + * @return the ID of the snapshot that was current at the given timestamp, or null + */ + private static Snapshot findLatestAncestorOlderThan(TableMetadata meta, long timestampMillis) { + long snapshotTimestamp = 0; + Snapshot result = null; + for (Long snapshotId : currentAncestors(meta)) { + Snapshot snapshot = meta.snapshot(snapshotId); + if (snapshot.timestampMillis() < timestampMillis && + snapshot.timestampMillis() > snapshotTimestamp) { + result = snapshot; + snapshotTimestamp = snapshot.timestampMillis(); + } + } + return result; + } + + private static List currentAncestors(TableMetadata meta) { + return SnapshotUtil.ancestorIds(meta.currentSnapshot(), meta::snapshot); + } + + private static boolean isCurrentAncestor(TableMetadata meta, long snapshotId) { + return currentAncestors(meta).contains(snapshotId); + } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index a37efe9cacc0..2fe6b9d43974 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -244,6 +244,12 @@ public void commit() { updated = base.replaceCurrentSnapshot(newSnapshot); } + if (updated == base) { + // do not commit if the metadata has not changed. for example, this may happen when setting the current + // snapshot to an ID that is already current. note that this check uses identity. + return; + } + // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries // to ensure that if a concurrent operation assigns the UUID, this operation will not fail. taskOps.commit(base, updated.withUUID()); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index f40bf06403b4..a6f86175917a 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -34,6 +34,7 @@ public class SnapshotSummary { public static final String TOTAL_RECORDS_PROP = "total-records"; public static final String DELETED_DUPLICATE_FILES = "deleted-duplicate-files"; public static final String CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"; + public static final String STAGED_WAP_ID_PROP = "wap.id"; public static final String PUBLISHED_WAP_ID_PROP = "published-wap-id"; public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id"; diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 76dfe55da456..e44e002bdab4 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -394,22 +394,19 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { // there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current if (snapshotsById.containsKey(snapshot.snapshotId())) { return setCurrentSnapshotTo(snapshot); - } else { - - List newSnapshots = ImmutableList.builder() - .addAll(snapshots) - .add(snapshot) - .build(); - List newSnapshotLog = ImmutableList.builder() - .addAll(snapshotLog) - .add(new SnapshotLogEntry(snapshot.timestampMillis(), - snapshot.snapshotId())) - .build(); - - return new TableMetadata(null, uuid, location, - snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties, - snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } + + List newSnapshots = ImmutableList.builder() + .addAll(snapshots) + .add(snapshot) + .build(); + List newSnapshotLog = ImmutableList.builder() + .addAll(snapshotLog) + .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())) + .build(); + return new TableMetadata(null, uuid, location, + snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } public TableMetadata removeSnapshotsIf(Predicate removeIf) { @@ -448,6 +445,11 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) { ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()), "Cannot set current snapshot to unknown: %s", snapshot.snapshotId()); + if (currentSnapshotId == snapshot.snapshotId()) { + // change is a noop + return this; + } + long nowMillis = System.currentTimeMillis(); List newSnapshotLog = ImmutableList.builder() .addAll(snapshotLog) diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 1b6a841ee171..ac38a86bc2ad 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -55,29 +55,4 @@ public static List ancestorIds(Snapshot snapshot, Function } return ancestorIds; } - - public static boolean isCurrentAncestor(Table table, long snapshotId) { - List currentAncestors = SnapshotUtil.ancestorIds(table.currentSnapshot(), table::snapshot); - return currentAncestors.contains(snapshotId); - } - - /** - * Return the latest snapshot whose timestamp is before the provided timestamp. - * @param table Table representing the table state on which the snapshot is being looked up - * @param timestampMillis lookup snapshots before this timestamp - * @return - */ - public static Snapshot findLatestSnapshotOlderThan(Table table, long timestampMillis) { - long snapshotTimestamp = 0; - Snapshot result = null; - for (Long snapshotId : currentAncestors(table)) { - Snapshot snapshot = table.snapshot(snapshotId); - if (snapshot.timestampMillis() < timestampMillis && - snapshot.timestampMillis() > snapshotTimestamp) { - result = snapshot; - snapshotTimestamp = snapshot.timestampMillis(); - } - } - return result; - } } diff --git a/core/src/main/java/org/apache/iceberg/util/WapUtil.java b/core/src/main/java/org/apache/iceberg/util/WapUtil.java index 5c090ea5b34c..3ecd44a52241 100644 --- a/core/src/main/java/org/apache/iceberg/util/WapUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/WapUtil.java @@ -19,14 +19,9 @@ package org.apache.iceberg.util; -import com.google.common.base.Preconditions; -import java.util.Map; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; -import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.exceptions.CherrypickAncestorCommitException; import org.apache.iceberg.exceptions.DuplicateWAPCommitException; public class WapUtil { @@ -35,71 +30,39 @@ private WapUtil() { } public static String stagedWapId(Snapshot snapshot) { - return snapshot.summary() != null ? snapshot.summary().getOrDefault("wap.id", null) : null; + return snapshot.summary() != null ? snapshot.summary().get(SnapshotSummary.STAGED_WAP_ID_PROP) : null; + } + + public static String publishedWapId(Snapshot snapshot) { + return snapshot.summary() != null ? snapshot.summary().get(SnapshotSummary.PUBLISHED_WAP_ID_PROP) : null; } /** - * Check if a given staged snapshot's associated wap-id was already published. Does not fail for non-WAP workflows - * @param table a {@link Table} + * Check if a given staged snapshot's associated wap-id was already published. Does not fail for non-WAP workflows. + * + * @param current the current {@link TableMetadata metadata} for the target table * @param wapSnapshotId a snapshot id which could have been staged and is associated with a wap id + * @return the WAP ID that will be published, if the snapshot has one */ - public static void validateWapPublish(Table table, Long wapSnapshotId) { - Preconditions.checkArgument(table instanceof HasTableOperations, - "Cannot validate WAP publish on a table that doesn't expose its TableOperations"); - TableMetadata baseMeta = ((HasTableOperations) table).operations().current(); - Snapshot cherryPickSnapshot = ((HasTableOperations) table).operations() - .current().snapshot(wapSnapshotId); + public static String validateWapPublish(TableMetadata current, long wapSnapshotId) { + Snapshot cherryPickSnapshot = current.snapshot(wapSnapshotId); String wapId = stagedWapId(cherryPickSnapshot); - if (isWapWorkflow(baseMeta, wapSnapshotId)) { - if (WapUtil.isWapIdPublished(table, wapId)) { + if (wapId != null && !wapId.isEmpty()) { + if (WapUtil.isWapIdPublished(current, wapId)) { throw new DuplicateWAPCommitException(wapId); } } - } - - /** - * Ensure that the given snapshot picked to be committed is not already an ancestor. This check applies to non-WAP - * snapshots too. Also check if the picked snapshot wasn't cherrypicked earlier. - * @param table table a {@link Table} - * @param snapshotId a snapshot id picked for cherrypick operation - */ - public static void validateNonAncestor(Table table, Long snapshotId) { - if (SnapshotUtil.isCurrentAncestor(table, snapshotId)) { - throw new CherrypickAncestorCommitException(snapshotId); - } - Long ancestorId = lookupAncestorBySourceSnapshot(table, snapshotId); - if (ancestorId != null) { - throw new CherrypickAncestorCommitException(snapshotId, ancestorId); - } - } - - private static Long lookupAncestorBySourceSnapshot(Table table, Long snapshotId) { - Long ancestorId = null; - String snapshotIdStr = String.valueOf(snapshotId); - for (Long publishedSnapshotId : SnapshotUtil.currentAncestors(table)) { - Map summary = table.snapshot(publishedSnapshotId).summary(); - if (summary != null && snapshotIdStr.equals(summary.get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP))) { - ancestorId = publishedSnapshotId; - break; - } - } - return ancestorId; - } - private static boolean isWapWorkflow(TableMetadata base, Long snapshotId) { - String wapId = stagedWapId(base.snapshot(snapshotId)); - return wapId != null && !wapId.isEmpty(); + return wapId; } - private static boolean isWapIdPublished(Table table, String wapId) { - boolean isPublished = false; - for (Long publishedSnapshotId : SnapshotUtil.currentAncestors(table)) { - Map summary = table.snapshot(publishedSnapshotId).summary(); - if (summary != null && wapId.equals(summary.get(SnapshotSummary.PUBLISHED_WAP_ID_PROP))) { - isPublished = true; - break; + private static boolean isWapIdPublished(TableMetadata current, String wapId) { + for (long ancestorId : SnapshotUtil.ancestorIds(current.currentSnapshot(), current::snapshot)) { + Snapshot snapshot = current.snapshot(ancestorId); + if (wapId.equals(stagedWapId(snapshot)) || wapId.equals(publishedWapId(snapshot))) { + return true; } } - return isPublished; + return false; } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java index d067e507f0f0..701457474110 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -40,6 +40,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -152,7 +153,7 @@ protected void commitOperation(SnapshotUpdate operation, int numFiles, String if (isWapTable() && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot - operation.set("wap.id", wapId); + operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId); operation.stageOnly(); }