diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index c1742f82ca84..9243eae10790 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -60,4 +60,10 @@ public interface SnapshotUpdate extends PendingUpdate { * @return this for method chaining */ ThisT scanManifestsWith(ExecutorService executorService); + + /** + * Perform operations on a particular branch + * @param branch which is name of SnapshotRef of type branch. + */ + ThisT toBranch(String branch); } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index a13089ff9ada..20cc1cd95255 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -114,6 +114,21 @@ public FastAppend appendManifest(ManifestFile manifest) { return this; } + @Override + public FastAppend toBranch(String branch) { + Preconditions.checkArgument(branch != null, "branch cannot be null"); + if (ops.current().ref(branch) == null) { + super.createNewRef(branch); + } + + Preconditions.checkArgument(ops.current() + .ref(branch).type() + .equals(SnapshotRefType.BRANCH), + "%s is not a ref to type branch", branch); + setTargetBranch(branch); + return this; + } + private ManifestFile copyManifest(ManifestFile manifest) { TableMetadata current = ops.current(); InputFile toCopy = ops.io().newInputFile(manifest.path()); @@ -125,6 +140,8 @@ private ManifestFile copyManifest(ManifestFile manifest) { @Override public List apply(TableMetadata base) { List newManifests = Lists.newArrayList(); + Snapshot current = base.ref(getTargetBranch()) != null ? + base.snapshot(base.ref(getTargetBranch()).snapshotId()) : base.currentSnapshot(); try { ManifestFile manifest = writeManifest(); @@ -140,8 +157,8 @@ public List apply(TableMetadata base) { manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); Iterables.addAll(newManifests, appendManifestsWithMetadata); - if (base.currentSnapshot() != null) { - newManifests.addAll(base.currentSnapshot().allManifests(ops.io())); + if (current != null) { + newManifests.addAll(current.allManifests(ops.io())); } return newManifests; diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index eab82b48ce50..f9f5a146244c 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -88,6 +88,7 @@ public void accept(String file) { private Consumer deleteFunc = defaultDelete; private ExecutorService workerPool = ThreadPools.getWorkerPool(); + private String targetBranch = SnapshotRef.MAIN_BRANCH; protected SnapshotProducer(TableOperations ops) { this.ops = ops; @@ -116,6 +117,38 @@ public ThisT scanManifestsWith(ExecutorService executorService) { return self(); } + @Override + public ThisT toBranch(String branch) { + throw new UnsupportedOperationException("Performing operations on a branch is currently not supported"); + } + + /*** + * Will be used by snapshot producer operations to create a new ref if an invalid branch is passed + * @param branch ref name on which operation is to performed + */ + protected void createNewRef(String branch) { + SnapshotRef branchRef = SnapshotRef.branchBuilder(this.current().currentSnapshot().snapshotId()).build(); + TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(this.current()); + updatedBuilder.setRef(branch, branchRef); + ops.commit(ops.current(), updatedBuilder.build()); + } + + /*** + * A setter for the target branch on which snapshot producer operation should be performed + * @param branch to set as target branch + */ + protected void setTargetBranch(String branch) { + this.targetBranch = branch; + } + + /*** + * A getter for the target branch on which snapshot producer operation should be performed + * @return target branch + */ + protected String getTargetBranch() { + return targetBranch; + } + protected ExecutorService workerPool() { return this.workerPool; } @@ -167,8 +200,7 @@ protected void validate(TableMetadata currentMetadata) { @Override public Snapshot apply() { refresh(); - Long parentSnapshotId = base.currentSnapshot() != null ? - base.currentSnapshot().snapshotId() : null; + Long parentSnapshotId = base.ref(targetBranch) != null ? base.ref(targetBranch).snapshotId() : null; long sequenceNumber = base.nextSequenceNumber(); // run validations from the child operation @@ -298,11 +330,11 @@ public void commit() { TableMetadata.Builder update = TableMetadata.buildFrom(base); if (base.snapshot(newSnapshot.snapshotId()) != null) { // this is a rollback operation - update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH); + update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch); } else if (stageOnly) { update.addSnapshot(newSnapshot); } else { - update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH); + update.setBranchSnapshot(newSnapshot, targetBranch); } TableMetadata updated = update.build(); diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index e0ffacd73778..d5281b09ad2e 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.Assert; @@ -478,4 +479,72 @@ public void testIncludedPartitionSummaryLimit() { String changedPartitions = table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); Assert.assertEquals("Should set changed partition count", "2", changedPartitions); } + + @Test + public void testAppendToBranch() throws UnsupportedOperationException { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + TableMetadata branchBase = readMetadata(); + Long branchSnapshotStart = table.currentSnapshot().snapshotId(); + + table.newFastAppend() + .appendFile(FILE_C) + .commit(); + + TableMetadata base = readMetadata(); + + table.newFastAppend() + .appendFile(FILE_D) + .commit(); + + Iterable allManifests = table.currentSnapshot().allManifests(table.io()); + Assert.assertEquals(3, Iterables.size(allManifests)); + validateSnapshot(base.currentSnapshot(), table.currentSnapshot(), 3, FILE_D); + + table.manageSnapshots().createBranch("ref", branchSnapshotStart).commit(); + + table.newFastAppend() + .toBranch("ref") + .appendFile(FILE_B).commit(); + + Snapshot branchCurrentSnapshot = table.snapshot(table.ops().current().ref("ref").snapshotId()); + + Assert.assertEquals(branchSnapshotStart, branchCurrentSnapshot.parentId()); + validateSnapshot(branchBase.currentSnapshot(), branchCurrentSnapshot, 4, FILE_B); + + Iterable branchManifests = branchCurrentSnapshot.allManifests(table.io()); + Assert.assertEquals(2, Iterables.size(branchManifests)); + + TableMetadata newBase = readMetadata(); + table.newFastAppend() + .appendFile(FILE_A2) + .commit(); + validateSnapshot(newBase.currentSnapshot(), table.currentSnapshot(), 5, FILE_A2); + } + + @Test(expected = IllegalArgumentException.class) + public void testAppendToNullBranch() { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + table.manageSnapshots().createBranch("ref", table.currentSnapshot().snapshotId()).commit(); + table.newFastAppend() + .toBranch(null) + .appendFile(FILE_B) + .commit(); + } + + @Test + public void testAppendToInValidBranch() { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + table.manageSnapshots().createBranch("ref", table.currentSnapshot().snapshotId()).commit(); + table.newFastAppend().appendFile(FILE_B).toBranch("newBranch").commit(); + Assert.assertNotNull(table.ops().current().ref("newBranch")); + } }