diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index 7ccd9cf7d23d..2d94fda8cec3 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -62,6 +62,7 @@ public interface SnapshotUpdate extends PendingUpdate { /** * Perform operations on a particular branch + * * @param branch which is name of SnapshotRef of type branch. */ ThisT toBranch(String branch); diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index bbb51fdc7e3e..a073d79e5552 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -105,7 +105,7 @@ public OverwriteFiles validateNoConflictingDeletes() { } @Override - protected void validate(TableMetadata base) { + protected void validate(TableMetadata base, Snapshot snapshot) { if (validateAddedFilesMatchOverwriteFilter) { PartitionSpec spec = dataSpec(); Expression rowFilter = rowFilter(); diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 2847f5ceca6b..dd44505e9d39 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -80,7 +80,7 @@ public ReplacePartitions validateNoConflictingData() { } @Override - public void validate(TableMetadata currentMetadata) { + public void validate(TableMetadata currentMetadata, Snapshot snapshot) { if (validateConflictingData) { if (dataSpec().isUnpartitioned()) { validateAddedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue()); @@ -101,14 +101,14 @@ public void validate(TableMetadata currentMetadata) { } @Override - public List apply(TableMetadata base) { + public List apply(TableMetadata base, Snapshot snapshot) { if (dataSpec().fields().size() <= 0) { // replace all data in an unpartitioned table deleteByRowFilter(Expressions.alwaysTrue()); } try { - return super.apply(base); + return super.apply(base, snapshot); } catch (ManifestFilterManager.DeleteException e) { throw new ValidationException( "Cannot commit file that conflicts with existing partition: %s", e.partition()); diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index 1bc846e27602..8a3b137b2d3d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -110,7 +110,7 @@ public RewriteFiles validateFromSnapshot(long snapshotId) { } @Override - protected void validate(TableMetadata base) { + protected void validate(TableMetadata base, Snapshot snapshot) { if (replacedDataFiles.size() > 0) { // if there are replaced data files, there cannot be any new row-level deletes for those data // files diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index c61b99dcfc65..816bc0c8a7ec 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -168,7 +168,7 @@ private ManifestFile copyManifest(ManifestFile manifest) { } @Override - public List apply(TableMetadata base) { + public List apply(TableMetadata base, Snapshot snapshot) { List currentManifests = base.currentSnapshot().dataManifests(ops.io()); Set currentManifestSet = ImmutableSet.copyOf(currentManifests); diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 35a04ba39493..50a0e26ab368 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -96,7 +96,7 @@ public RowDelta validateNoConflictingDeleteFiles() { } @Override - protected void validate(TableMetadata base) { + protected void validate(TableMetadata base, Snapshot snapshot) { if (base.currentSnapshot() != null) { if (!referencedDataFiles.isEmpty()) { validateDataFilesExist( diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java index 77de54279768..3786b1185be6 100644 --- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java +++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java @@ -152,7 +152,7 @@ public Object updateEvent() { } @Override - protected void validate(TableMetadata base) { + protected void validate(TableMetadata base, Snapshot snapshot) { // this is only called after apply() passes off to super, but check fast-forward status just in // case if (!isFastForward(base)) { diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index febdcee633e8..f3955e15f6ce 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -97,6 +97,12 @@ public FastAppend appendFile(DataFile file) { return this; } + @Override + public FastAppend toBranch(String branch) { + targetBranch(branch); + return this; + } + @Override public FastAppend appendManifest(ManifestFile manifest) { Preconditions.checkArgument( @@ -135,7 +141,7 @@ private ManifestFile copyManifest(ManifestFile manifest) { } @Override - public List apply(TableMetadata base) { + public List apply(TableMetadata base, Snapshot snapshot) { List newManifests = Lists.newArrayList(); try { @@ -153,8 +159,8 @@ public List apply(TableMetadata base) { manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); Iterables.addAll(newManifests, appendManifestsWithMetadata); - if (base.currentSnapshot() != null) { - newManifests.addAll(base.currentSnapshot().allManifests(ops.io())); + if (snapshot != null) { + newManifests.addAll(snapshot.allManifests(ops.io())); } return newManifests; diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 789c6c23c32b..b82244f0714f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -758,13 +758,11 @@ protected Map summary() { } @Override - public List apply(TableMetadata base) { - Snapshot current = base.currentSnapshot(); - + public List apply(TableMetadata base, Snapshot snapshot) { // filter any existing manifests List filtered = filterManager.filterManifests( - base.schema(), current != null ? current.dataManifests(ops.io()) : null); + base.schema(), snapshot != null ? snapshot.dataManifests(ops.io()) : null); long minDataSequenceNumber = filtered.stream() .map(ManifestFile::minSequenceNumber) @@ -777,7 +775,7 @@ public List apply(TableMetadata base) { deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber); List filteredDeletes = deleteFilterManager.filterManifests( - base.schema(), current != null ? current.deleteManifests(ops.io()) : null); + base.schema(), snapshot != null ? snapshot.deleteManifests(ops.io()) : null); // only keep manifests that have live data files or that were written by this commit Predicate shouldKeep = diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 4242f771ff2a..50642ae59279 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -116,36 +116,26 @@ public ThisT scanManifestsWith(ExecutorService executorService) { @Override public ThisT toBranch(String branch) { - throw new UnsupportedOperationException("Performing operations on a branch is currently not supported"); + throw new UnsupportedOperationException( + String.format( + "Cannot commit to branch %s: %s does not support branch commits", + branch, this.getClass().getName())); } - /*** - * Will be used by snapshot producer operations to create a new ref if an invalid branch is passed - * @param branch ref name on which operation is to performed - */ - protected void createNewRef(String branch) { - SnapshotRef branchRef = SnapshotRef.branchBuilder(this.current().currentSnapshot().snapshotId()).build(); - TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(this.current()); - updatedBuilder.setRef(branch, branchRef); - ops.commit(ops.current(), updatedBuilder.build()); - } - - /*** - * A setter for the target branch on which snapshot producer operation should be performed + /** + * * A setter for the target branch on which snapshot producer operation should be performed + * * @param branch to set as target branch */ - protected void setTargetBranch(String branch) { + protected void targetBranch(String branch) { + Preconditions.checkArgument(branch != null, "Invalid branch name: null"); + boolean refExists = base.ref(branch) != null; + Preconditions.checkArgument( + !refExists || base.ref(branch).isBranch(), + "%s is a tag, not a branch. Tags cannot be targets for producing snapshots"); this.targetBranch = branch; } - /*** - * A getter for the target branch on which snapshot producer operation should be performed - * @return target branch - */ - protected String getTargetBranch() { - return targetBranch; - } - protected ExecutorService workerPool() { return this.workerPool; } @@ -183,27 +173,37 @@ public ThisT deleteWith(Consumer deleteCallback) { *

Child operations can override this to add custom validation. * * @param currentMetadata current table metadata to validate + * @param snapshot ending snapshot on the lineage which is being validated */ - protected void validate(TableMetadata currentMetadata) {} + protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {} /** * Apply the update's changes to the base table metadata and return the new manifest list. * * @param metadataToUpdate the base table metadata to apply changes to + * @param snapshot * @return a manifest list for the new snapshot. */ - protected abstract List apply(TableMetadata metadataToUpdate); + protected abstract List apply(TableMetadata metadataToUpdate, Snapshot snapshot); @Override public Snapshot apply() { refresh(); - Long parentSnapshotId = base.ref(targetBranch) != null ? base.ref(targetBranch).snapshotId() : null; - long sequenceNumber = base.nextSequenceNumber(); + Snapshot parentSnapshot = base.currentSnapshot(); + if (targetBranch != null) { + SnapshotRef branch = base.ref(targetBranch); + if (branch != null) { + parentSnapshot = base.snapshot(branch.snapshotId()); + } else if (base.currentSnapshot() != null) { + parentSnapshot = base.currentSnapshot(); + } + } - // run validations from the child operation - validate(base); + long sequenceNumber = base.nextSequenceNumber(); + Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId(); - List manifests = apply(base); + validate(base, parentSnapshot); + List manifests = apply(base, parentSnapshot); if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) { @@ -362,18 +362,19 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .run(taskOps -> { - Snapshot newSnapshot = apply(); - newSnapshotId.set(newSnapshot.snapshotId()); - TableMetadata.Builder update = TableMetadata.buildFrom(base); - if (base.snapshot(newSnapshot.snapshotId()) != null) { - // this is a rollback operation - update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch); - } else if (stageOnly) { - update.addSnapshot(newSnapshot); - } else { - update.setBranchSnapshot(newSnapshot, targetBranch); - } + .run( + taskOps -> { + Snapshot newSnapshot = apply(); + newSnapshotId.set(newSnapshot.snapshotId()); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + if (base.snapshot(newSnapshot.snapshotId()) != null) { + // this is a rollback operation + update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch); + } else if (stageOnly) { + update.addSnapshot(newSnapshot); + } else { + update.setBranchSnapshot(newSnapshot, targetBranch); + } TableMetadata updated = update.build(); if (updated.changes().isEmpty()) { diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index cb57ece4472f..508c90255e72 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -488,12 +488,53 @@ public void testIncludedPartitionSummaryLimit() { Assert.assertEquals("Should set changed partition count", "2", changedPartitions); } - @Test(expected = UnsupportedOperationException.class) - public void testAppendToBranch() throws UnsupportedOperationException { - table.newFastAppend() - .appendFile(FILE_A) - .commit(); - table.manageSnapshots().createBranch("ref", table.currentSnapshot().snapshotId()).commit(); - table.newFastAppend().appendFile(FILE_B).toBranch("ref").commit(); + @Test + public void testAppendToExistingBranch() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit(); + int branchSnapshot = 2; + + Assert.assertEquals(table.currentSnapshot().snapshotId(), 1); + Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot); + } + + @Test + public void testAppendCreatesBranchIfNeeded() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit(); + int branchSnapshot = 2; + + Assert.assertEquals(table.currentSnapshot().snapshotId(), 1); + Assert.assertNotNull(table.ops().current().ref("branch")); + Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot); + } + + @Test + public void testAppendToBranchEmptyTable() { + table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit(); + int branchSnapshot = 1; + + Assert.assertNull(table.currentSnapshot()); + Assert.assertNotNull(table.ops().current().ref("branch")); + Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot); + } + + @Test + public void testAppendToNullBranchFails() { + AssertHelpers.assertThrows( + "Invalid branch", + IllegalArgumentException.class, + () -> table.newFastAppend().appendFile(FILE_A).toBranch(null)); + } + + @Test + public void testAppendToTagFails() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.manageSnapshots().createTag("some-tag", table.currentSnapshot().snapshotId()).commit(); + AssertHelpers.assertThrows( + "Invalid branch", + IllegalArgumentException.class, + () -> table.newFastAppend().appendFile(FILE_A).toBranch("some-tag").commit()); } } diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java b/core/src/test/java/org/apache/iceberg/TestOverwrite.java index f7788fbe3221..082f6396bde7 100644 --- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java +++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java @@ -300,4 +300,17 @@ public void testValidatedOverwriteWithAppendSuccess() { Assert.assertEquals( "Should not create a new snapshot", baseId, table.currentSnapshot().snapshotId()); } + + @Test + public void testOverwriteToBranchUnsupported() { + AssertHelpers.assertThrows( + "Cannot commit to branch someBranch: org.apache.iceberg.BaseOverwriteFiles does not support branch commits", + UnsupportedOperationException.class, + () -> + table + .newOverwrite() + .overwriteByRowFilter(and(equal("date", "2018-06-09"), lessThan("id", 20))) + .addFile(FILE_10_TO_14) + .toBranch("someBranch")); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java index 9c73d2d9576b..d5007bf6de06 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java +++ b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java @@ -742,4 +742,13 @@ public void testValidateOnlyDeletes() { public void testEmptyPartitionPathWithUnpartitionedTable() { DataFiles.builder(PartitionSpec.unpartitioned()).withPartitionPath(""); } + + @Test + public void testReplacePartitionsOnBranchUnsupported() { + AssertHelpers.assertThrows( + "Should reject committing rewrite manifests to branch", + UnsupportedOperationException.class, + "Cannot commit to branch someBranch: org.apache.iceberg.BaseReplacePartitions does not support branch commits", + () -> table.newReplacePartitions().addFile(FILE_UNPARTITIONED_A).toBranch("someBranch")); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index 175c80c4d1e0..633b27241ded 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -1088,6 +1088,20 @@ public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOE Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists()); } + @Test + public void testRewriteManifestsOnBranchUnsupported() { + + table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + Assert.assertEquals(1, table.currentSnapshot().allManifests(table.io()).size()); + + AssertHelpers.assertThrows( + "Should reject committing rewrite manifests to branch", + UnsupportedOperationException.class, + "Cannot commit to branch someBranch: org.apache.iceberg.BaseRewriteManifests does not support branch commits", + () -> table.rewriteManifests().toBranch("someBranch").commit()); + } + private void validateSummary( Snapshot snapshot, int replaced, int kept, int created, int entryCount) { Map summary = snapshot.summary(); diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 6ebb92eb865a..e2929b470994 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -1412,4 +1412,20 @@ public void testRowDeltaCaseSensitivity() { .validateNoConflictingDeleteFiles() .commit()); } + + @Test + public void testRowDeltaToBranchUnsupported() { + AssertHelpers.assertThrows( + "Should reject committing row delta to branch", + UnsupportedOperationException.class, + "Cannot commit to branch someBranch: org.apache.iceberg.BaseRowDelta does not support branch commits", + () -> + table + .newRowDelta() + .caseSensitive(false) + .addRows(FILE_B) + .addDeletes(FILE_A2_DELETES) + .toBranch("someBranch") + .commit()); + } }