diff --git a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
new file mode 100644
index 000000000000..153f1e02712d
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.DuplicateWAPCommitException;
+import org.apache.iceberg.exceptions.ValidationException;
+
+/**
+ * API for managing snapshots. Allows rolling table data back to a stated at an older table {@link Snapshot snapshot}.
+ * Rollback:
+ *
+ * This API does not allow conflicting calls to {@link #setCurrentSnapshot(long)} and
+ * {@link #rollbackToTime(long)}.
+ *
+ * When committing, these changes will be applied to the current table metadata. Commit conflicts
+ * will not be resolved and will result in a {@link CommitFailedException}.
+ * Cherrypick:
+ *
+ * In an audit workflow, new data is written to an orphan {@link Snapshot snapshot} that is not committed as
+ * the table's current state until it is audited. After auditing a change, it may need to be applied or cherry-picked
+ * on top of the latest snapshot instead of the one that was current when the audited changes were created.
+ * This class adds support for cherry-picking the changes from an orphan snapshot by applying them to
+ * the current snapshot. The output of the operation is a new snapshot with the changes from cherry-picked
+ * snapshot.
+ *
+ */
+public interface ManageSnapshots extends PendingUpdate {
+
+ /**
+ * Roll this table's data back to a specific {@link Snapshot} identified by id.
+ *
+ * @param snapshotId long id of the snapshot to roll back table data to
+ * @return this for method chaining
+ * @throws IllegalArgumentException If the table has no snapshot with the given id
+ */
+ ManageSnapshots setCurrentSnapshot(long snapshotId);
+
+ /**
+ * Roll this table's data back to the last {@link Snapshot} before the given timestamp.
+ *
+ * @param timestampMillis a long timestamp, as returned by {@link System#currentTimeMillis()}
+ * @return this for method chaining
+ * @throws IllegalArgumentException If the table has no old snapshot before the given timestamp
+ */
+ ManageSnapshots rollbackToTime(long timestampMillis);
+
+ /**
+ * Rollback table's state to a specific {@link Snapshot} identified by id.
+ * @param snapshotId long id of snapshot id to roll back table to. Must be an ancestor of the current snapshot
+ * @throws IllegalArgumentException If the table has no snapshot with the given id
+ * @throws ValidationException If given snapshot id is not an ancestor of the current state
+ */
+ ManageSnapshots rollbackTo(long snapshotId);
+
+ /**
+ * Apply supported changes in given snapshot and create a new snapshot which will be set as the
+ * current snapshot on commit.
+ * @param snapshotId a snapshotId whose changes to apply
+ * @return this for method chaining
+ * @throws IllegalArgumentException If the table has no snapshot with the given id
+ * @throws DuplicateWAPCommitException In case of a WAP workflow and if the table has has a duplicate commit with same
+ * wapId
+ */
+ ManageSnapshots cherrypick(long snapshotId);
+}
diff --git a/api/src/main/java/org/apache/iceberg/Rollback.java b/api/src/main/java/org/apache/iceberg/Rollback.java
index 0992969ce13b..b73353943670 100644
--- a/api/src/main/java/org/apache/iceberg/Rollback.java
+++ b/api/src/main/java/org/apache/iceberg/Rollback.java
@@ -38,7 +38,9 @@ public interface Rollback extends PendingUpdate {
* @param snapshotId long id of the snapshot to roll back table data to
* @return this for method chaining
* @throws IllegalArgumentException If the table has no snapshot with the given id
+ * @deprecated Replaced by {@link ManageSnapshots#setCurrentSnapshot(long)}
*/
+ @Deprecated
Rollback toSnapshotId(long snapshotId);
/**
@@ -47,7 +49,8 @@ public interface Rollback extends PendingUpdate {
* @param timestampMillis a long timestamp, as returned by {@link System#currentTimeMillis()}
* @return this for method chaining
* @throws IllegalArgumentException If the table has no old snapshot before the given timestamp
+ * @deprecated Replaced by {@link ManageSnapshots#rollbackToTime(long)}
*/
+ @Deprecated
Rollback toSnapshotAtTime(long timestampMillis);
-
}
diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index 63c55e53ebe2..1d97b9069f99 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -203,9 +203,17 @@ default AppendFiles newFastAppend() {
* Create a new {@link Rollback rollback API} to roll back to a previous snapshot and commit.
*
* @return a new {@link Rollback}
+ * @deprecated Replaced by {@link #manageSnapshots()}
*/
+ @Deprecated
Rollback rollback();
+ /**
+ * Create a new {@link ManageSnapshots manage snapshots API} to manage snapshots in this table and commit.
+ * @return a new {@link ManageSnapshots}
+ */
+ ManageSnapshots manageSnapshots();
+
/**
* Create a new {@link Transaction transaction API} to commit multiple table operations at once.
*
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CherrypickAncestorCommitException.java b/api/src/main/java/org/apache/iceberg/exceptions/CherrypickAncestorCommitException.java
new file mode 100644
index 000000000000..11b875aaf876
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/exceptions/CherrypickAncestorCommitException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.exceptions;
+
+/**
+ * This exception occurs when one cherrypicks an ancestor or when the picked snapshot is already linked to
+ * a published ancestor. This additionally helps avoid duplicate cherrypicks on non-WAP snapshots.
+ */
+public class CherrypickAncestorCommitException extends ValidationException {
+
+ public CherrypickAncestorCommitException(long snapshotId) {
+ super("Cannot cherrypick snapshot %s: already an ancestor", String.valueOf(snapshotId));
+ }
+
+ public CherrypickAncestorCommitException(long snapshotId, long publishedAncestorId) {
+ super("Cannot cherrypick snapshot %s: already picked to create ancestor %s",
+ String.valueOf(snapshotId), String.valueOf(publishedAncestorId));
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/DuplicateWAPCommitException.java b/api/src/main/java/org/apache/iceberg/exceptions/DuplicateWAPCommitException.java
new file mode 100644
index 000000000000..e32f36e4de6f
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/exceptions/DuplicateWAPCommitException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.exceptions;
+
+/**
+ * This exception occurs when the WAP workflow detects a duplicate wap commit. This helps clients
+ * to detect duplicate snapshots that are connected by the same wap id.
+ */
+public class DuplicateWAPCommitException extends ValidationException {
+
+ public DuplicateWAPCommitException(String wapId) {
+ super("Duplicate request to cherry pick wap id that was published already: %s", wapId);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 0d6dd7719863..80603103c36d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -142,6 +142,11 @@ public Rollback rollback() {
throw new UnsupportedOperationException("Cannot roll back a metadata table");
}
+ @Override
+ public ManageSnapshots manageSnapshots() {
+ throw new UnsupportedOperationException("Cannot manage snapshots in a metadata table");
+ }
+
@Override
public Transaction newTransaction() {
throw new UnsupportedOperationException("Cannot create transactions for a metadata table");
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 96326238bff0..c7b1559e42d3 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -163,7 +163,7 @@ private void cacheChanges() {
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
try (CloseableIterable entries = new ManifestGroup(io, changedManifests)
.ignoreExisting()
- .select(ManifestReader.CHANGE_WITH_STATS_COLUMNS)
+ .select(ManifestReader.ALL_COLUMNS)
.entries()) {
for (ManifestEntry entry : entries) {
switch (entry.status()) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 3312a7a1d96b..84a5de642205 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -159,6 +159,11 @@ public Rollback rollback() {
return new RollbackToSnapshot(ops);
}
+ @Override
+ public ManageSnapshots manageSnapshots() {
+ return new SnapshotManager(ops);
+ }
+
@Override
public Transaction newTransaction() {
return Transactions.newTransaction(ops);
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 09539f20fbcd..1e968130f90a 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -585,6 +585,11 @@ public Rollback rollback() {
throw new UnsupportedOperationException("Transaction tables do not support rollback");
}
+ @Override
+ public ManageSnapshots manageSnapshots() {
+ throw new UnsupportedOperationException("Transaction tables do not support managing snapshots");
+ }
+
@Override
public Transaction newTransaction() {
throw new UnsupportedOperationException("Cannot create a transaction within a transaction");
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index e258d4197eaf..bb052ed78ff2 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -49,7 +49,7 @@
public class ManifestReader extends CloseableGroup implements Filterable {
private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);
- private static final ImmutableList ALL_COLUMNS = ImmutableList.of("*");
+ static final ImmutableList ALL_COLUMNS = ImmutableList.of("*");
static final ImmutableList CHANGE_COLUMNS = ImmutableList.of(
"file_path", "file_format", "partition", "record_count", "file_size_in_bytes");
static final ImmutableList CHANGE_WITH_STATS_COLUMNS = ImmutableList.builder()
diff --git a/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java b/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java
index 191f113500c7..9d6c8111a0c5 100644
--- a/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java
@@ -19,61 +19,21 @@
package org.apache.iceberg;
-import com.google.common.base.Preconditions;
-import org.apache.iceberg.exceptions.ValidationException;
-
-class RollbackToSnapshot implements Rollback {
- private final TableOperations ops;
- private TableMetadata base = null;
- private Long targetSnapshotId = null;
+class RollbackToSnapshot extends SnapshotManager implements Rollback {
RollbackToSnapshot(TableOperations ops) {
- this.ops = ops;
- this.base = ops.current(); // do not retry
+ super(ops);
}
@Override
public Rollback toSnapshotId(long snapshotId) {
- Preconditions.checkArgument(base.snapshot(snapshotId) != null,
- "Cannot roll back to unknown snapshot id: %s", snapshotId);
-
- this.targetSnapshotId = snapshotId;
-
+ super.setCurrentSnapshot(snapshotId);
return this;
}
@Override
public Rollback toSnapshotAtTime(long timestampMillis) {
- long snapshotId = 0;
- long snapshotTimestamp = 0;
- // find the latest snapshot by timestamp older than timestampMillis
- for (Snapshot snapshot : base.snapshots()) {
- if (snapshot.timestampMillis() < timestampMillis &&
- snapshot.timestampMillis() > snapshotTimestamp) {
- snapshotId = snapshot.snapshotId();
- snapshotTimestamp = snapshot.timestampMillis();
- }
- }
-
- Preconditions.checkArgument(base.snapshot(snapshotId) != null,
- "Cannot roll back, no valid snapshot older than: %s", timestampMillis);
-
- this.targetSnapshotId = snapshotId;
-
+ super.rollbackToTime(timestampMillis);
return this;
}
-
- @Override
- public Snapshot apply() {
- ValidationException.check(targetSnapshotId != null,
- "Cannot roll back to unknown version: call toSnapshotId or toSnapshotAtTime");
- return base.snapshot(targetSnapshotId);
- }
-
- @Override
- public void commit() {
- // rollback does not refresh or retry. it only operates on the state of the table when rollback
- // was called to create the transaction.
- ops.commit(base, base.rollbackTo(apply()));
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
new file mode 100644
index 000000000000..58a278d62484
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.WapUtil;
+
+public class SnapshotManager extends MergingSnapshotProducer implements ManageSnapshots {
+
+ private enum SnapshotManagerOperation {
+ CHERRYPICK,
+ ROLLBACK
+ }
+
+ private SnapshotManagerOperation managerOperation = null;
+ private Long targetSnapshotId = null;
+ private String snapshotOperation = null;
+
+ SnapshotManager(TableOperations ops) {
+ super(ops);
+ }
+
+ @Override
+ protected ManageSnapshots self() {
+ return this;
+ }
+
+ @Override
+ protected String operation() {
+ // snapshotOperation is used by SnapshotProducer when building and writing a new snapshot for cherrypick
+ Preconditions.checkNotNull(snapshotOperation, "[BUG] Detected uninitialized operation");
+ return snapshotOperation;
+ }
+
+ @Override
+ public ManageSnapshots cherrypick(long snapshotId) {
+ TableMetadata current = current();
+ ValidationException.check(current.snapshot(snapshotId) != null,
+ "Cannot cherry pick unknown snapshot id: %s", snapshotId);
+
+ Snapshot cherryPickSnapshot = current.snapshot(snapshotId);
+ // only append operations are currently supported
+ if (!cherryPickSnapshot.operation().equals(DataOperations.APPEND)) {
+ throw new UnsupportedOperationException("Can cherry pick only append operations");
+ }
+
+ this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
+ this.targetSnapshotId = snapshotId;
+ this.snapshotOperation = cherryPickSnapshot.operation();
+
+ // Pick modifications from the snapshot
+ for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
+ add(addedFile);
+ }
+
+ // this property is set on target snapshot that will get published
+ String wapId = WapUtil.validateWapPublish(current, targetSnapshotId);
+ if (wapId != null) {
+ set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId);
+ }
+
+ // link the snapshot about to be published on commit with the picked snapshot
+ set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(targetSnapshotId));
+
+ return this;
+ }
+
+ @Override
+ public ManageSnapshots setCurrentSnapshot(long snapshotId) {
+ ValidationException.check(current().snapshot(snapshotId) != null,
+ "Cannot roll back to unknown snapshot id: %s", snapshotId);
+
+ this.managerOperation = SnapshotManagerOperation.ROLLBACK;
+ this.targetSnapshotId = snapshotId;
+
+ return this;
+ }
+
+ @Override
+ public ManageSnapshots rollbackToTime(long timestampMillis) {
+ // find the latest snapshot by timestamp older than timestampMillis
+ Snapshot snapshot = findLatestAncestorOlderThan(current(), timestampMillis);
+ Preconditions.checkArgument(snapshot != null,
+ "Cannot roll back, no valid snapshot older than: %s", timestampMillis);
+
+ this.managerOperation = SnapshotManagerOperation.ROLLBACK;
+ this.targetSnapshotId = snapshot.snapshotId();
+
+ return this;
+ }
+
+ @Override
+ public ManageSnapshots rollbackTo(long snapshotId) {
+ TableMetadata current = current();
+ ValidationException.check(current.snapshot(snapshotId) != null,
+ "Cannot roll back to unknown snapshot id: %s", snapshotId);
+ ValidationException.check(
+ isCurrentAncestor(current, snapshotId),
+ "Cannot roll back to snapshot, not an ancestor of the current state: %s", snapshotId);
+ return setCurrentSnapshot(snapshotId);
+ }
+
+ private void validate(TableMetadata base) {
+ validateNonAncestor(base, targetSnapshotId);
+ WapUtil.validateWapPublish(base, targetSnapshotId);
+ }
+
+ @Override
+ public List apply(TableMetadata base) {
+ // this apply method is called by SnapshotProducer, which refreshes the current table state
+ // because the state may have changed in that refresh, the validations must be done here
+ validate(base);
+ return super.apply(base);
+ }
+
+ @Override
+ public Snapshot apply() {
+ TableMetadata base = refresh();
+
+ if (targetSnapshotId == null) {
+ // if no target snapshot was configured then NOOP by returning current state
+ return base.currentSnapshot();
+ }
+
+ switch (managerOperation) {
+ case CHERRYPICK:
+ if (base.snapshot(targetSnapshotId).parentId() != null &&
+ base.currentSnapshot().snapshotId() == base.snapshot(targetSnapshotId).parentId()) {
+ // the snapshot to cherrypick is already based on the current state: fast-forward
+ validate(base);
+ return base.snapshot(targetSnapshotId);
+ } else {
+ // validate(TableMetadata) is called in apply(TableMetadata) after this apply refreshes the table state
+ return super.apply();
+ }
+
+ case ROLLBACK:
+ return base.snapshot(targetSnapshotId);
+
+ default:
+ throw new ValidationException("Invalid SnapshotManagerOperation: only cherrypick, rollback are supported");
+ }
+ }
+
+ private static void validateNonAncestor(TableMetadata meta, long snapshotId) {
+ if (isCurrentAncestor(meta, snapshotId)) {
+ throw new CherrypickAncestorCommitException(snapshotId);
+ }
+
+ Long ancestorId = lookupAncestorBySourceSnapshot(meta, snapshotId);
+ if (ancestorId != null) {
+ throw new CherrypickAncestorCommitException(snapshotId, ancestorId);
+ }
+ }
+
+ private static Long lookupAncestorBySourceSnapshot(TableMetadata meta, long snapshotId) {
+ String snapshotIdStr = String.valueOf(snapshotId);
+ for (long ancestorId : currentAncestors(meta)) {
+ Map summary = meta.snapshot(ancestorId).summary();
+ if (summary != null && snapshotIdStr.equals(summary.get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP))) {
+ return ancestorId;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Return the latest snapshot whose timestamp is before the provided timestamp.
+ *
+ * @param meta {@link TableMetadata} for a table
+ * @param timestampMillis lookup snapshots before this timestamp
+ * @return the ID of the snapshot that was current at the given timestamp, or null
+ */
+ private static Snapshot findLatestAncestorOlderThan(TableMetadata meta, long timestampMillis) {
+ long snapshotTimestamp = 0;
+ Snapshot result = null;
+ for (Long snapshotId : currentAncestors(meta)) {
+ Snapshot snapshot = meta.snapshot(snapshotId);
+ if (snapshot.timestampMillis() < timestampMillis &&
+ snapshot.timestampMillis() > snapshotTimestamp) {
+ result = snapshot;
+ snapshotTimestamp = snapshot.timestampMillis();
+ }
+ }
+ return result;
+ }
+
+ private static List currentAncestors(TableMetadata meta) {
+ return SnapshotUtil.ancestorIds(meta.currentSnapshot(), meta::snapshot);
+ }
+
+ private static boolean isCurrentAncestor(TableMetadata meta, long snapshotId) {
+ return currentAncestors(meta).contains(snapshotId);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index a37efe9cacc0..a55bc06f11ac 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -139,7 +139,7 @@ public ThisT deleteWith(Consumer deleteCallback) {
@Override
public Snapshot apply() {
- this.base = ops.refresh();
+ this.base = refresh();
Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
@@ -221,6 +221,15 @@ private Map summary(TableMetadata previous) {
return builder.build();
}
+ protected TableMetadata current() {
+ return base;
+ }
+
+ protected TableMetadata refresh() {
+ this.base = ops.refresh();
+ return base;
+ }
+
@Override
public void commit() {
// this is always set to the latest commit attempt's snapshot id.
@@ -244,6 +253,12 @@ public void commit() {
updated = base.replaceCurrentSnapshot(newSnapshot);
}
+ if (updated == base) {
+ // do not commit if the metadata has not changed. for example, this may happen when setting the current
+ // snapshot to an ID that is already current. note that this check uses identity.
+ return;
+ }
+
// if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
// to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
taskOps.commit(base, updated.withUUID());
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
index 9f422226332d..a6f86175917a 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
@@ -34,6 +34,9 @@ public class SnapshotSummary {
public static final String TOTAL_RECORDS_PROP = "total-records";
public static final String DELETED_DUPLICATE_FILES = "deleted-duplicate-files";
public static final String CHANGED_PARTITION_COUNT_PROP = "changed-partition-count";
+ public static final String STAGED_WAP_ID_PROP = "wap.id";
+ public static final String PUBLISHED_WAP_ID_PROP = "published-wap-id";
+ public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
private SnapshotSummary() {
}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index a5081a6b45d9..e44e002bdab4 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -391,6 +391,11 @@ public TableMetadata addStagedSnapshot(Snapshot snapshot) {
}
public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
+ // there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
+ if (snapshotsById.containsKey(snapshot.snapshotId())) {
+ return setCurrentSnapshotTo(snapshot);
+ }
+
List newSnapshots = ImmutableList.builder()
.addAll(snapshots)
.add(snapshot)
@@ -436,10 +441,15 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) {
addPreviousFile(file, lastUpdatedMillis));
}
- public TableMetadata rollbackTo(Snapshot snapshot) {
+ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()),
"Cannot set current snapshot to unknown: %s", snapshot.snapshotId());
+ if (currentSnapshotId == snapshot.snapshotId()) {
+ // change is a noop
+ return this;
+ }
+
long nowMillis = System.currentTimeMillis();
List newSnapshotLog = ImmutableList.builder()
.addAll(snapshotLog)
diff --git a/core/src/main/java/org/apache/iceberg/util/WapUtil.java b/core/src/main/java/org/apache/iceberg/util/WapUtil.java
new file mode 100644
index 000000000000..3ecd44a52241
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/util/WapUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.DuplicateWAPCommitException;
+
+public class WapUtil {
+
+ private WapUtil() {
+ }
+
+ public static String stagedWapId(Snapshot snapshot) {
+ return snapshot.summary() != null ? snapshot.summary().get(SnapshotSummary.STAGED_WAP_ID_PROP) : null;
+ }
+
+ public static String publishedWapId(Snapshot snapshot) {
+ return snapshot.summary() != null ? snapshot.summary().get(SnapshotSummary.PUBLISHED_WAP_ID_PROP) : null;
+ }
+
+ /**
+ * Check if a given staged snapshot's associated wap-id was already published. Does not fail for non-WAP workflows.
+ *
+ * @param current the current {@link TableMetadata metadata} for the target table
+ * @param wapSnapshotId a snapshot id which could have been staged and is associated with a wap id
+ * @return the WAP ID that will be published, if the snapshot has one
+ */
+ public static String validateWapPublish(TableMetadata current, long wapSnapshotId) {
+ Snapshot cherryPickSnapshot = current.snapshot(wapSnapshotId);
+ String wapId = stagedWapId(cherryPickSnapshot);
+ if (wapId != null && !wapId.isEmpty()) {
+ if (WapUtil.isWapIdPublished(current, wapId)) {
+ throw new DuplicateWAPCommitException(wapId);
+ }
+ }
+
+ return wapId;
+ }
+
+ private static boolean isWapIdPublished(TableMetadata current, String wapId) {
+ for (long ancestorId : SnapshotUtil.ancestorIds(current.currentSnapshot(), current::snapshot)) {
+ Snapshot snapshot = current.snapshot(ancestorId);
+ if (wapId.equals(stagedWapId(snapshot)) || wapId.equals(publishedWapId(snapshot))) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java b/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
new file mode 100644
index 000000000000..f11363d04466
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Iterables;
+import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
+import org.apache.iceberg.exceptions.DuplicateWAPCommitException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWapWorkflow extends TableTestBase {
+
+ @Before
+ public void setupTableProperties() {
+ table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit();
+ }
+
+ @Test
+ public void testCurrentSnapshotOperation() {
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ long firstSnapshotId = base.currentSnapshot().snapshotId();
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+ base = readMetadata();
+
+ Snapshot wapSnapshot = base.snapshots().get(1);
+
+ Assert.assertEquals("Metadata should have both snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Snapshot should have wap id in summary", "123456789",
+ wapSnapshot.summary().get("wap.id"));
+ Assert.assertEquals("Current snapshot should be first commit's snapshot",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+
+ // do setCurrentSnapshot
+ table.manageSnapshots().setCurrentSnapshot(wapSnapshot.snapshotId()).commit();
+ base = readMetadata();
+
+ Assert.assertEquals("Current snapshot should be what we rolled back to",
+ wapSnapshot.snapshotId(), base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Metadata should have both snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Should contain manifests for both files", 2, base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 2,
+ base.snapshotLog().size());
+ }
+
+ @Test
+ public void testSetCurrentSnapshotNoWAP() {
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ Snapshot firstSnapshot = base.currentSnapshot();
+ long firstSnapshotId = firstSnapshot.snapshotId();
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+ base = readMetadata();
+
+ // do setCurrentSnapshot
+ table.manageSnapshots().setCurrentSnapshot(firstSnapshotId).commit();
+ base = readMetadata();
+
+ Assert.assertEquals("Current snapshot should be what we rolled back to",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Metadata should have both snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Should contain manifests for both files", 1, base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 3,
+ base.snapshotLog().size());
+ }
+
+ @Test
+ public void testRollbackOnInvalidNonAncestor() {
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ long firstSnapshotId = base.currentSnapshot().snapshotId();
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+ base = readMetadata();
+
+ Snapshot wapSnapshot = base.snapshots().get(1);
+
+ Assert.assertEquals("Metadata should have both snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Snapshot should have wap id in summary", "123456789",
+ wapSnapshot.summary().get("wap.id"));
+ Assert.assertEquals("Current snapshot should be first commit's snapshot",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+
+ // do rollback
+ AssertHelpers.assertThrows("should fail on invalid snapshot", ValidationException.class,
+ "Cannot roll back to snapshot, not an ancestor of the current state: 2",
+ () -> {
+ // rollback to snapshot that is not an ancestor
+ table.manageSnapshots().rollbackTo(wapSnapshot.snapshotId()).commit();
+ });
+ base = readMetadata();
+
+ Assert.assertEquals("Current snapshot should be what we rolled back to", firstSnapshotId,
+ base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Metadata should have both snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Should contain manifests for one snapshot", 1,
+ base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+ }
+
+ @Test
+ public void testRollbackAndCherrypick() {
+ // first snapshot
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ Snapshot firstSnapshot = base.currentSnapshot();
+ long firstSnapshotId = firstSnapshot.snapshotId();
+
+ // second snapshot
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+ base = readMetadata();
+ Snapshot secondSnapshot = base.currentSnapshot();
+
+ // third snapshot
+ table.newAppend()
+ .appendFile(FILE_C)
+ .commit();
+ base = readMetadata();
+ Snapshot thirdSnapshot = base.currentSnapshot();
+
+ // rollback to first snapshot
+ table.manageSnapshots().rollbackTo(firstSnapshotId).commit();
+ base = readMetadata();
+ Assert.assertEquals("Should be at first snapshot", firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should have all three snapshots in the system", 3, base.snapshots().size());
+
+ // fast forward to third snapshot
+ table.manageSnapshots().cherrypick(thirdSnapshot.snapshotId()).commit();
+ base = readMetadata();
+ Assert.assertEquals("Current state should be at third snapshot", 4,
+ base.currentSnapshot().snapshotId());
+
+ // fast forward to 2nd snapshot
+ table.manageSnapshots().cherrypick(secondSnapshot.snapshotId()).commit();
+ base = readMetadata();
+ Assert.assertEquals("Current state should be at second snapshot", 5,
+ base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Count all snapshots", 5,
+ base.snapshots().size());
+ }
+
+ @Test
+ public void testRollbackToTime() {
+
+ // first snapshot
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ Snapshot firstSnapshot = base.currentSnapshot();
+ long firstSnapshotId = firstSnapshot.snapshotId();
+
+ // second snapshot
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+ base = readMetadata();
+ Snapshot secondSnapshot = base.currentSnapshot();
+
+ // third snapshot
+ table.newAppend()
+ .appendFile(FILE_C)
+ .commit();
+ base = readMetadata();
+ Snapshot thirdSnapshot = base.currentSnapshot();
+
+ // rollback to before the second snapshot's time
+ table.manageSnapshots().rollbackToTime(secondSnapshot.timestampMillis()).commit();
+ base = readMetadata();
+
+ Assert.assertEquals("Should be at first snapshot", firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should have all three snapshots in the system", 3, base.snapshots().size());
+ }
+
+ @Test
+ public void testWithCherryPicking() {
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ long firstSnapshotId = base.currentSnapshot().snapshotId();
+
+ // first WAP commit
+ table.newAppend()
+ .appendFile(FILE_B)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+ base = readMetadata();
+
+ // pick the snapshot that's staged but not committed
+ Snapshot wapSnapshot = base.snapshots().get(1);
+
+ Assert.assertEquals("Should have both snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Should have first wap id in summary", "123456789",
+ wapSnapshot.summary().get("wap.id"));
+ Assert.assertEquals("Current snapshot should be first commit's snapshot",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+
+ // cherry-pick snapshot
+ table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit();
+ base = readMetadata();
+
+ // check if the effective current snapshot is set to the new snapshot created
+ // as a result of the cherry-pick operation
+ Assert.assertEquals("Current snapshot should be fast-forwarded to wap snapshot",
+ wapSnapshot.snapshotId(), base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should have two snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Should contain manifests for both files", 2, base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 2,
+ base.snapshotLog().size());
+ }
+
+ @Test
+ public void testWithTwoPhaseCherryPicking() {
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ // load current snapshot
+ Snapshot parentSnapshot = base.currentSnapshot();
+ long firstSnapshotId = parentSnapshot.snapshotId();
+
+ // first WAP commit
+ table.newAppend()
+ .appendFile(FILE_B)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+
+ // second WAP commit
+ table.newAppend()
+ .appendFile(FILE_C)
+ .set("wap.id", "987654321")
+ .stageOnly()
+ .commit();
+ base = readMetadata();
+
+ // pick the snapshot that's staged but not committed
+ Snapshot wap1Snapshot = base.snapshots().get(1);
+ Snapshot wap2Snapshot = base.snapshots().get(2);
+
+ Assert.assertEquals("Should have three snapshots", 3, base.snapshots().size());
+ Assert.assertEquals("Should have first wap id in summary", "123456789",
+ wap1Snapshot.summary().get("wap.id"));
+ Assert.assertEquals("Should have second wap id in summary", "987654321",
+ wap2Snapshot.summary().get("wap.id"));
+ Assert.assertEquals("Current snapshot should be first commit's snapshot",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Parent snapshot id should be same for first WAP snapshot",
+ firstSnapshotId, wap1Snapshot.parentId().longValue());
+ Assert.assertEquals("Parent snapshot id should be same for second WAP snapshot",
+ firstSnapshotId, wap2Snapshot.parentId().longValue());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+
+ // load current snapshot
+ parentSnapshot = base.currentSnapshot();
+ // cherry-pick first snapshot
+ table.manageSnapshots().cherrypick(wap1Snapshot.snapshotId()).commit();
+ base = readMetadata();
+
+ // check if the effective current snapshot is set to the new snapshot created
+ // as a result of the cherry-pick operation
+ Assert.assertEquals(
+ "Current snapshot should be set to one after wap snapshot",
+ parentSnapshot.snapshotId() + 1,
+ base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should contain manifests for both files", 2,
+ base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Parent snapshot id should change to latest snapshot before commit",
+ parentSnapshot.snapshotId(), base.currentSnapshot().parentId().longValue());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 2,
+ base.snapshotLog().size());
+
+ // load current snapshot
+ parentSnapshot = base.currentSnapshot();
+ // cherry-pick second snapshot
+ table.manageSnapshots().cherrypick(wap2Snapshot.snapshotId()).commit();
+ base = readMetadata();
+
+ // check if the effective current snapshot is set to the new snapshot created
+ // as a result of the cherry-pick operation
+ Assert.assertEquals("Current snapshot should be set to one after wap snapshot",
+ parentSnapshot.snapshotId() + 1 /* one fast-forwarded snapshot */ + 1,
+ base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should contain manifests for both files", 3, base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Parent snapshot id should change to latest snapshot before commit",
+ parentSnapshot.snapshotId(), base.currentSnapshot().parentId().longValue());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 3,
+ base.snapshotLog().size());
+ }
+
+ @Test
+ public void testWithCommitsBetweenCherryPicking() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ // load current snapshot
+ Snapshot parentSnapshot = base.currentSnapshot();
+ long firstSnapshotId = parentSnapshot.snapshotId();
+
+ // first WAP commit
+ table.newAppend()
+ .appendFile(FILE_B)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+
+ // second WAP commit
+ table.newAppend()
+ .appendFile(FILE_C)
+ .set("wap.id", "987654321")
+ .stageOnly()
+ .commit();
+ base = readMetadata();
+
+ // pick the snapshot that's staged but not committed
+ Snapshot wap1Snapshot = base.snapshots().get(1);
+ Snapshot wap2Snapshot = base.snapshots().get(2);
+
+ Assert.assertEquals("Should have three snapshots", 3, base.snapshots().size());
+ Assert.assertEquals("Should have first wap id in summary", "123456789",
+ wap1Snapshot.summary().get("wap.id"));
+ Assert.assertEquals("Should have second wap id in summary", "987654321",
+ wap2Snapshot.summary().get("wap.id"));
+ Assert.assertEquals("Current snapshot should be first commit's snapshot",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Parent snapshot id should be same for first WAP snapshot",
+ firstSnapshotId, wap1Snapshot.parentId().longValue());
+ Assert.assertEquals("Parent snapshot id should be same for second WAP snapshot",
+ firstSnapshotId, wap2Snapshot.parentId().longValue());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+
+ // load current snapshot
+ parentSnapshot = base.currentSnapshot();
+
+ // table has new commit
+ table.newAppend()
+ .appendFile(FILE_D)
+ .commit();
+ base = readMetadata();
+
+ Assert.assertEquals("Should have four snapshots", 4, base.snapshots().size());
+ Assert.assertEquals("Current snapshot should carry over the parent snapshot",
+ parentSnapshot.snapshotId(), base.currentSnapshot().parentId().longValue());
+ Assert.assertEquals("Should contain manifests for two files", 2,
+ base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 2,
+ base.snapshotLog().size());
+
+ // load current snapshot
+ parentSnapshot = base.currentSnapshot();
+ // cherry-pick first snapshot
+ table.manageSnapshots().cherrypick(wap1Snapshot.snapshotId()).commit();
+ base = readMetadata();
+
+ // check if the effective current snapshot is set to the new snapshot created
+ // as a result of the cherry-pick operation
+ Assert.assertEquals("Should have five snapshots", 5, base.snapshots().size());
+ Assert.assertEquals("Current snapshot should be set to one after wap snapshot",
+ parentSnapshot.snapshotId() + 1, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should contain manifests for three files", 3,
+ base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Parent snapshot id should point to same snapshot",
+ parentSnapshot.snapshotId(), base.currentSnapshot().parentId().longValue());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 3,
+ base.snapshotLog().size());
+
+ // load current snapshot
+ parentSnapshot = base.currentSnapshot();
+ // cherry-pick first snapshot
+ table.manageSnapshots().cherrypick(wap2Snapshot.snapshotId()).commit();
+ base = readMetadata();
+
+ // check if the effective current snapshot is set to the new snapshot created
+ // as a result of the cherry-pick operation
+ Assert.assertEquals("Should have all the snapshots", 6, base.snapshots().size());
+ Assert.assertEquals("Current snapshot should be set to one after wap snapshot",
+ parentSnapshot.snapshotId() + 1, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should contain manifests for four files", 4,
+ base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Parent snapshot id should point to same snapshot",
+ parentSnapshot.snapshotId(), base.currentSnapshot().parentId().longValue());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 4,
+ base.snapshotLog().size());
+ }
+
+ @Test
+ public void testWithCherryPickingWithCommitRetry() {
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ // load current snapshot
+ Snapshot parentSnapshot = base.currentSnapshot();
+ long firstSnapshotId = parentSnapshot.snapshotId();
+
+ // first WAP commit
+ table.newAppend()
+ .appendFile(FILE_B)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+
+ // // second WAP commit
+ // table.newAppend()
+ // .appendFile(FILE_C)
+ // .set(SnapshotSummary.STAGED_WAP_ID_PROP, "987654321")
+ // .stageOnly()
+ // .commit();
+ base = readMetadata();
+
+ // pick the snapshot that's staged but not committed
+ Snapshot wap1Snapshot = base.snapshots().get(1);
+ // Snapshot wap2Snapshot = base.snapshots().get(2);
+
+ Assert.assertEquals("Should have three snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Should have first wap id in summary", "123456789",
+ wap1Snapshot.summary().get("wap.id"));
+ // Assert.assertEquals("Should have second wap id in summary", "987654321",
+ // wap2Snapshot.summary().get(SnapshotSummary.STAGED_WAP_ID_PROP));
+ Assert.assertEquals("Current snapshot should be first commit's snapshot",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Parent snapshot id should be same for first WAP snapshot",
+ firstSnapshotId, wap1Snapshot.parentId().longValue());
+ // Assert.assertEquals("Parent snapshot id should be same for second WAP snapshot",
+ // firstSnapshotId, wap2Snapshot.parentId().longValue());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+
+ // load current snapshot
+ base = readMetadata();
+ parentSnapshot = base.currentSnapshot();
+ // cherry-pick first snapshot
+ table.ops().failCommits(3);
+ table.manageSnapshots().cherrypick(wap1Snapshot.snapshotId()).commit();
+ base = readMetadata();
+
+ // check if the effective current snapshot is set to the new snapshot created
+ // as a result of the cherry-pick operation
+ Assert.assertEquals(
+ "Current snapshot should be set to one after wap snapshot",
+ parentSnapshot.snapshotId() + 1,
+ base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should contain manifests for both files", 2,
+ base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should not contain redundant append due to retry", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Parent snapshot id should change to latest snapshot before commit",
+ parentSnapshot.snapshotId(), base.currentSnapshot().parentId().longValue());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 2,
+ base.snapshotLog().size());
+ }
+
+ @Test
+ public void testCherrypickingAncestor() {
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ long firstSnapshotId = base.currentSnapshot().snapshotId();
+
+ // first WAP commit
+ table.newAppend()
+ .appendFile(FILE_B)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+ base = readMetadata();
+
+ // pick the snapshot that's staged but not committed
+ Snapshot wapSnapshot = base.snapshots().get(1);
+
+ Assert.assertEquals("Should have both snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Should have first wap id in summary", "123456789",
+ wapSnapshot.summary().get("wap.id"));
+ Assert.assertEquals("Current snapshot should be first commit's snapshot",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+
+ // cherry-pick snapshot
+ table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit();
+ base = readMetadata();
+ long wapPublishedId = table.currentSnapshot().snapshotId();
+
+ // check if the effective current snapshot is set to the new snapshot created
+ // as a result of the cherry-pick operation
+ Assert.assertEquals("Current snapshot should be fast-forwarded to wap snapshot",
+ wapSnapshot.snapshotId(), base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Should have two snapshots", 2, base.snapshots().size());
+ Assert.assertEquals("Should contain manifests for both files", 2, base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 2,
+ base.snapshotLog().size());
+
+ AssertHelpers.assertThrows("should throw exception", CherrypickAncestorCommitException.class,
+ String.format("Cannot cherrypick snapshot %s: already an ancestor", 1), () -> {
+ // duplicate cherry-pick snapshot
+ table.manageSnapshots().cherrypick(firstSnapshotId).commit();
+ }
+ );
+ }
+
+ @Test
+ public void testDuplicateCherrypick() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ long firstSnapshotId = base.currentSnapshot().snapshotId();
+
+ // stage first WAP commit
+ table.newAppend()
+ .appendFile(FILE_B)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+ // stage second WAP commit with same wap.id
+ table.newAppend()
+ .appendFile(FILE_C)
+ .set("wap.id", "123456789")
+ .stageOnly()
+ .commit();
+ base = readMetadata();
+
+ // pick the snapshot that's staged but not committed
+ Snapshot wapSnapshot1 = base.snapshots().get(1);
+ Snapshot wapSnapshot2 = base.snapshots().get(2);
+
+ Assert.assertEquals("Should have both snapshots", 3, base.snapshots().size());
+ Assert.assertEquals("Should have wap id in first wap snapshot summary", "123456789",
+ wapSnapshot1.summary().get("wap.id"));
+ Assert.assertEquals("Should have wap id in second wap snapshot summary", "123456789",
+ wapSnapshot2.summary().get("wap.id"));
+ Assert.assertEquals("Current snapshot should be first commit's snapshot",
+ firstSnapshotId, base.currentSnapshot().snapshotId());
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 1,
+ base.snapshotLog().size());
+
+ // cherry-pick snapshot
+ table.manageSnapshots().cherrypick(wapSnapshot1.snapshotId()).commit();
+ base = readMetadata();
+
+ Assert.assertEquals("Should have three snapshots", 3, base.snapshots().size());
+ Assert.assertEquals("Should contain manifests for both files", 2, base.currentSnapshot().manifests().size());
+ Assert.assertEquals("Should contain append from last commit", 1,
+ Iterables.size(base.currentSnapshot().addedFiles()));
+ Assert.assertEquals("Snapshot log should indicate number of snapshots committed", 2,
+ base.snapshotLog().size());
+
+ AssertHelpers.assertThrows("should throw exception", DuplicateWAPCommitException.class,
+ String.format("Duplicate request to cherry pick wap id that was published already: %s", 123456789), () -> {
+ // duplicate cherry-pick snapshot
+ table.manageSnapshots().cherrypick(wapSnapshot2.snapshotId()).commit();
+ }
+ );
+ }
+
+ @Test
+ public void testNonWapCherrypick() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ TableMetadata base = readMetadata();
+ long firstSnapshotId = base.currentSnapshot().snapshotId();
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+ base = readMetadata();
+ long secondSnapshotId = base.currentSnapshot().snapshotId();
+
+ table.newAppend()
+ .appendFile(FILE_C)
+ .commit();
+ base = readMetadata();
+ long thirdSnapshotId = base.currentSnapshot().snapshotId();
+
+ Assert.assertEquals("Should be pointing to third snapshot", thirdSnapshotId,
+ table.currentSnapshot().snapshotId());
+
+ // NOOP commit
+ table.manageSnapshots().commit();
+ Assert.assertEquals("Should still be pointing to third snapshot", thirdSnapshotId,
+ table.currentSnapshot().snapshotId());
+
+ // Rollback to second snapshot
+ table.manageSnapshots().rollbackTo(secondSnapshotId).commit();
+ Assert.assertEquals("Should be pointing to second snapshot", secondSnapshotId,
+ table.currentSnapshot().snapshotId());
+
+ // Cherrypick down to third
+ table.manageSnapshots().cherrypick(thirdSnapshotId).commit();
+ Assert.assertEquals("Should be re-using wap snapshot after cherrypick", 3,
+ table.currentSnapshot().snapshotId());
+
+ // try double cherrypicking of the third snapshot
+ AssertHelpers.assertThrows("should not allow cherrypicking ancestor", CherrypickAncestorCommitException.class,
+ String.format("Cannot cherrypick snapshot %s: already an ancestor", 3), () -> {
+ // double cherrypicking of second snapshot
+ table.manageSnapshots().cherrypick(thirdSnapshotId).commit();
+ });
+
+ // try cherrypicking an ancestor
+ AssertHelpers.assertThrows("should not allow double cherrypick", CherrypickAncestorCommitException.class,
+ String.format("Cannot cherrypick snapshot %s: already an ancestor", firstSnapshotId), () -> {
+ // double cherrypicking of second snapshot
+ table.manageSnapshots().cherrypick(firstSnapshotId).commit();
+ });
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index d067e507f0f0..701457474110 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -40,6 +40,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -152,7 +153,7 @@ protected void commitOperation(SnapshotUpdate> operation, int numFiles, String
if (isWapTable() && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
- operation.set("wap.id", wapId);
+ operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId);
operation.stageOnly();
}