Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/ManageSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ public interface ManageSnapshots extends PendingUpdate<Snapshot> {
*/
ManageSnapshots cherrypick(long snapshotId);

/**
* Create a new branch. The branch will point to current snapshot if the current snapshot is not
* NULL. Otherwise, the branch will point to a newly created empty snapshot.
*
* @param name branch name
* @return this for method chaining
* @throws IllegalArgumentException if a branch with the given name already exists
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a test for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do a follow-up.

Copy link
Contributor Author

@ConeyLiu ConeyLiu Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue I opened #8197 for a follow-up. And I am sorry, this does miss the checking.

*/
default ManageSnapshots createBranch(String name) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement createBranch(String)");
}

/**
* Create a new branch pointing to the given snapshot id.
*
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public ManageSnapshots rollbackTo(long snapshotId) {
return this;
}

@Override
public ManageSnapshots createBranch(String name) {
Snapshot currentSnapshot = transaction.currentMetadata().currentSnapshot();
if (currentSnapshot != null) {
return createBranch(name, currentSnapshot.snapshotId());
}

// Create an empty snapshot for the branch
transaction.newFastAppend().toBranch(name).commit();
return this;
}

@Override
public ManageSnapshots createBranch(String name, long snapshotId) {
updateSnapshotReferencesOperation().createBranch(name, snapshotId);
Expand Down
32 changes: 32 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,38 @@ public void testCreateBranch() {
&& expectedBranch.equals(SnapshotRef.branchBuilder(snapshotId).build()));
}

@Test
public void testCreateBranchWithoutSnapshotId() {
table.newAppend().appendFile(FILE_A).commit();
long snapshotId = table.currentSnapshot().snapshotId();
// Test a basic case of creating a branch
table.manageSnapshots().createBranch("branch1").commit();
SnapshotRef actualBranch = table.ops().refresh().ref("branch1");
Assertions.assertThat(actualBranch).isNotNull();
Assertions.assertThat(actualBranch).isEqualTo(SnapshotRef.branchBuilder(snapshotId).build());
}

@Test
public void testCreateBranchOnEmptyTable() {
table.manageSnapshots().createBranch("branch1").commit();

SnapshotRef mainSnapshotRef = table.ops().refresh().ref(SnapshotRef.MAIN_BRANCH);
Assertions.assertThat(mainSnapshotRef).isNull();

SnapshotRef branch1SnapshotRef = table.ops().refresh().ref("branch1");
Assertions.assertThat(branch1SnapshotRef).isNotNull();
Assertions.assertThat(branch1SnapshotRef.minSnapshotsToKeep()).isNull();
Assertions.assertThat(branch1SnapshotRef.maxSnapshotAgeMs()).isNull();
Assertions.assertThat(branch1SnapshotRef.maxRefAgeMs()).isNull();

Snapshot snapshot = table.snapshot(branch1SnapshotRef.snapshotId());
Assertions.assertThat(snapshot.parentId()).isNull();
Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
}

@Test
public void testCreateBranchFailsWhenRefAlreadyExists() {
table.newAppend().appendFile(FILE_A).commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec(
.map(java.lang.Long.valueOf)
.orNull

Preconditions.checkArgument(snapshotId != null,
"Cannot complete create or replace branch operation on %s, main has no snapshot", ident)

val manageSnapshots = iceberg.table().manageSnapshots()
val refExists = null != iceberg.table().refs().get(branch)

def safeCreateBranch(): Unit = {
if (snapshotId == null) {
manageSnapshots.createBranch(branch)
} else {
manageSnapshots.createBranch(branch, snapshotId)
}
}

if (create && replace && !refExists) {
manageSnapshots.createBranch(branch, snapshotId)
safeCreateBranch()
} else if (replace) {
Preconditions.checkArgument(snapshotId != null,
"Cannot complete replace branch operation on %s, main has no snapshot", ident)
manageSnapshots.replaceBranch(branch, snapshotId)
} else {
if (refExists && ifNotExists) {
return Nil
}

manageSnapshots.createBranch(branch, snapshotId)
safeCreateBranch()
}

if (branchOptions.numSnapshots.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -92,11 +93,25 @@ public void testCreateBranch() throws NoSuchTableException {

@Test
public void testCreateBranchOnEmptyTable() {
Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Cannot complete create or replace branch operation on %s, main has no snapshot",
tableName);
String branchName = "b1";
sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1");
Table table = validationCatalog.loadTable(tableIdent);

SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
Assertions.assertThat(mainRef).isNull();

SnapshotRef ref = table.refs().get(branchName);
Assertions.assertThat(ref).isNotNull();
Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
Assertions.assertThat(ref.maxRefAgeMs()).isNull();

Snapshot snapshot = table.snapshot(ref.snapshotId());
Assertions.assertThat(snapshot.parentId()).isNull();
Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
}

@Test
Expand Down Expand Up @@ -308,6 +323,29 @@ public void createOrReplace() throws NoSuchTableException {
assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second);
}

@Test
public void testCreateOrReplaceBranchOnEmptyTable() {
String branchName = "b1";
sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1");
Table table = validationCatalog.loadTable(tableIdent);

SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
Assertions.assertThat(mainRef).isNull();

SnapshotRef ref = table.refs().get(branchName);
Assertions.assertThat(ref).isNotNull();
Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
Assertions.assertThat(ref.maxRefAgeMs()).isNull();

Snapshot snapshot = table.snapshot(ref.snapshotId());
Assertions.assertThat(snapshot.parentId()).isNull();
Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
}

@Test
public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException {
Table table = insertRows();
Expand Down