diff --git a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java index 81caf3a58de3..2fa60472da5e 100644 --- a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java @@ -83,6 +83,19 @@ public interface ManageSnapshots extends PendingUpdate { */ ManageSnapshots cherrypick(long snapshotId); + /** + * Create a new branch. The branch will point to current snapshot if the current snapshot is not + * NULL. Otherwise, the branch will point to a newly created empty snapshot. + * + * @param name branch name + * @return this for method chaining + * @throws IllegalArgumentException if a branch with the given name already exists + */ + default ManageSnapshots createBranch(String name) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement createBranch(String)"); + } + /** * Create a new branch pointing to the given snapshot id. * diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java index f9015c04b882..75ccad017781 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java @@ -68,6 +68,18 @@ public ManageSnapshots rollbackTo(long snapshotId) { return this; } + @Override + public ManageSnapshots createBranch(String name) { + Snapshot currentSnapshot = transaction.currentMetadata().currentSnapshot(); + if (currentSnapshot != null) { + return createBranch(name, currentSnapshot.snapshotId()); + } + + // Create an empty snapshot for the branch + transaction.newFastAppend().toBranch(name).commit(); + return this; + } + @Override public ManageSnapshots createBranch(String name, long snapshotId) { updateSnapshotReferencesOperation().createBranch(name, snapshotId); diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java index ba4e85f494ef..67a383583ee0 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java @@ -230,6 +230,38 @@ public void testCreateBranch() { && expectedBranch.equals(SnapshotRef.branchBuilder(snapshotId).build())); } + @Test + public void testCreateBranchWithoutSnapshotId() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + // Test a basic case of creating a branch + table.manageSnapshots().createBranch("branch1").commit(); + SnapshotRef actualBranch = table.ops().refresh().ref("branch1"); + Assertions.assertThat(actualBranch).isNotNull(); + Assertions.assertThat(actualBranch).isEqualTo(SnapshotRef.branchBuilder(snapshotId).build()); + } + + @Test + public void testCreateBranchOnEmptyTable() { + table.manageSnapshots().createBranch("branch1").commit(); + + SnapshotRef mainSnapshotRef = table.ops().refresh().ref(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainSnapshotRef).isNull(); + + SnapshotRef branch1SnapshotRef = table.ops().refresh().ref("branch1"); + Assertions.assertThat(branch1SnapshotRef).isNotNull(); + Assertions.assertThat(branch1SnapshotRef.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(branch1SnapshotRef.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(branch1SnapshotRef.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(branch1SnapshotRef.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); + } + @Test public void testCreateBranchFailsWhenRefAlreadyExists() { table.newAppend().appendFile(FILE_A).commit(); diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala index d4328d4b9227..2be406e7f344 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala @@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec( .map(java.lang.Long.valueOf) .orNull - Preconditions.checkArgument(snapshotId != null, - "Cannot complete create or replace branch operation on %s, main has no snapshot", ident) - val manageSnapshots = iceberg.table().manageSnapshots() val refExists = null != iceberg.table().refs().get(branch) + def safeCreateBranch(): Unit = { + if (snapshotId == null) { + manageSnapshots.createBranch(branch) + } else { + manageSnapshots.createBranch(branch, snapshotId) + } + } + if (create && replace && !refExists) { - manageSnapshots.createBranch(branch, snapshotId) + safeCreateBranch() } else if (replace) { + Preconditions.checkArgument(snapshotId != null, + "Cannot complete replace branch operation on %s, main has no snapshot", ident) manageSnapshots.replaceBranch(branch, snapshotId) } else { if (refExists && ifNotExists) { return Nil } - manageSnapshots.createBranch(branch, snapshotId) + safeCreateBranch() } if (branchOptions.numSnapshots.nonEmpty) { diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 0f00603bb42b..a6bf194b3df5 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -92,11 +93,25 @@ public void testCreateBranch() throws NoSuchTableException { @Test public void testCreateBranchOnEmptyTable() { - Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining( - "Cannot complete create or replace branch operation on %s, main has no snapshot", - tableName); + String branchName = "b1"; + sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1"); + Table table = validationCatalog.loadTable(tableIdent); + + SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainRef).isNull(); + + SnapshotRef ref = table.refs().get(branchName); + Assertions.assertThat(ref).isNotNull(); + Assertions.assertThat(ref.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(ref.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(ref.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); } @Test @@ -308,6 +323,29 @@ public void createOrReplace() throws NoSuchTableException { assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second); } + @Test + public void testCreateOrReplaceBranchOnEmptyTable() { + String branchName = "b1"; + sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1"); + Table table = validationCatalog.loadTable(tableIdent); + + SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainRef).isNull(); + + SnapshotRef ref = table.refs().get(branchName); + Assertions.assertThat(ref).isNotNull(); + Assertions.assertThat(ref.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(ref.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(ref.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); + } + @Test public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException { Table table = insertRows();