Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
* @return this for method chaining
*/
ThisT scanManifestsWith(ExecutorService executorService);

/**
* Perform operations on a particular branch
* @param branch which is name of SnapshotRef of type branch.
*/
ThisT toBranch(String branch);
}
21 changes: 19 additions & 2 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ public FastAppend appendManifest(ManifestFile manifest) {
return this;
}

@Override
public FastAppend toBranch(String branch) {
Preconditions.checkArgument(branch != null, "branch cannot be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message should be Invalid branch name: null

if (ops.current().ref(branch) == null) {
super.createNewRef(branch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be handled by SnapshotProducer, as I noted on #4926.

}

Preconditions.checkArgument(ops.current()
.ref(branch).type()
.equals(SnapshotRefType.BRANCH),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this check need to be split across 3 lines? Or is that autoformatting doing it?

"%s is not a ref to type branch", branch);
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also now that we're creating a branch if it doesn't exist, on commit, we should do a null check. It's okay if the branch is null, since we'll create it. if it's not null, it must be an actual branch. I think this should also reside in snapshot producer as well.

setTargetBranch(branch);
return this;
}

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
Expand All @@ -125,6 +140,8 @@ private ManifestFile copyManifest(ManifestFile manifest) {
@Override
public List<ManifestFile> apply(TableMetadata base) {
List<ManifestFile> newManifests = Lists.newArrayList();
Snapshot current = base.ref(getTargetBranch()) != null ?
base.snapshot(base.ref(getTargetBranch()).snapshotId()) : base.currentSnapshot();

try {
ManifestFile manifest = writeManifest();
Expand All @@ -140,8 +157,8 @@ public List<ManifestFile> apply(TableMetadata base) {
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
Iterables.addAll(newManifests, appendManifestsWithMetadata);

if (base.currentSnapshot() != null) {
newManifests.addAll(base.currentSnapshot().allManifests(ops.io()));
if (current != null) {
newManifests.addAll(current.allManifests(ops.io()));
}

return newManifests;
Expand Down
40 changes: 36 additions & 4 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void accept(String file) {
private Consumer<String> deleteFunc = defaultDelete;

private ExecutorService workerPool = ThreadPools.getWorkerPool();
private String targetBranch = SnapshotRef.MAIN_BRANCH;

protected SnapshotProducer(TableOperations ops) {
this.ops = ops;
Expand Down Expand Up @@ -116,6 +117,38 @@ public ThisT scanManifestsWith(ExecutorService executorService) {
return self();
}

@Override
public ThisT toBranch(String branch) {
throw new UnsupportedOperationException("Performing operations on a branch is currently not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the error wording looks a bit confusing to me. Are we planning to support the toBranch method in this class later or is it not envisioned to be supported on this particular class at all? thx!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimas-b , In the PR i have implemented the toBranch only in one of the subclasses of Snapshot producer aka FastAppend class. This is done in order to avoid creating a huge PR and be thorough in testing each of the child operations of snapshot producer. In future my aim is to incrementally raise PR for each of them.

Copy link
Contributor

@dimas-b dimas-b Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification, @namrathamyske ! Would you mind updating the message to something like Performing SnapshotProducer operations on a branch is currently not supported to avoid making a (false) impression that it is not supported in "general"?.. or use different wording as you see fit. My only concern is that getting the old message in one context might be interpreted that it is not supported anywhere.

}

/***
* Will be used by snapshot producer operations to create a new ref if an invalid branch is passed
* @param branch ref name on which operation is to performed
*/
protected void createNewRef(String branch) {
SnapshotRef branchRef = SnapshotRef.branchBuilder(this.current().currentSnapshot().snapshotId()).build();
TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(this.current());
updatedBuilder.setRef(branch, branchRef);
ops.commit(ops.current(), updatedBuilder.build());
}

/***
* A setter for the target branch on which snapshot producer operation should be performed
* @param branch to set as target branch
*/
protected void setTargetBranch(String branch) {
this.targetBranch = branch;
}

/***
* A getter for the target branch on which snapshot producer operation should be performed
* @return target branch
*/
protected String getTargetBranch() {
return targetBranch;
}

protected ExecutorService workerPool() {
return this.workerPool;
}
Expand Down Expand Up @@ -167,8 +200,7 @@ protected void validate(TableMetadata currentMetadata) {
@Override
public Snapshot apply() {
refresh();
Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
Long parentSnapshotId = base.ref(targetBranch) != null ? base.ref(targetBranch).snapshotId() : null;
long sequenceNumber = base.nextSequenceNumber();

// run validations from the child operation
Expand Down Expand Up @@ -298,11 +330,11 @@ public void commit() {
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH);
update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch);
} else if (stageOnly) {
update.addSnapshot(newSnapshot);
} else {
update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH);
update.setBranchSnapshot(newSnapshot, targetBranch);
}

TableMetadata updated = update.build();
Expand Down
69 changes: 69 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.Assert;
Expand Down Expand Up @@ -478,4 +479,72 @@ public void testIncludedPartitionSummaryLimit() {
String changedPartitions = table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP);
Assert.assertEquals("Should set changed partition count", "2", changedPartitions);
}

@Test
public void testAppendToBranch() throws UnsupportedOperationException {
table.newFastAppend()
.appendFile(FILE_A)
.commit();

TableMetadata branchBase = readMetadata();
Long branchSnapshotStart = table.currentSnapshot().snapshotId();

table.newFastAppend()
.appendFile(FILE_C)
.commit();

TableMetadata base = readMetadata();

table.newFastAppend()
.appendFile(FILE_D)
.commit();

Iterable<ManifestFile> allManifests = table.currentSnapshot().allManifests(table.io());
Assert.assertEquals(3, Iterables.size(allManifests));
validateSnapshot(base.currentSnapshot(), table.currentSnapshot(), 3, FILE_D);

table.manageSnapshots().createBranch("ref", branchSnapshotStart).commit();

table.newFastAppend()
.toBranch("ref")
.appendFile(FILE_B).commit();

Snapshot branchCurrentSnapshot = table.snapshot(table.ops().current().ref("ref").snapshotId());

Assert.assertEquals(branchSnapshotStart, branchCurrentSnapshot.parentId());
validateSnapshot(branchBase.currentSnapshot(), branchCurrentSnapshot, 4, FILE_B);

Iterable<ManifestFile> branchManifests = branchCurrentSnapshot.allManifests(table.io());
Assert.assertEquals(2, Iterables.size(branchManifests));

TableMetadata newBase = readMetadata();
table.newFastAppend()
.appendFile(FILE_A2)
.commit();
validateSnapshot(newBase.currentSnapshot(), table.currentSnapshot(), 5, FILE_A2);
}

@Test(expected = IllegalArgumentException.class)
public void testAppendToNullBranch() {
table.newFastAppend()
.appendFile(FILE_A)
.commit();

table.manageSnapshots().createBranch("ref", table.currentSnapshot().snapshotId()).commit();
table.newFastAppend()
.toBranch(null)
.appendFile(FILE_B)
.commit();
}

@Test
public void testAppendToInValidBranch() {
table.newFastAppend()
.appendFile(FILE_A)
.commit();

table.manageSnapshots().createBranch("ref", table.currentSnapshot().snapshotId()).commit();
table.newFastAppend().appendFile(FILE_B).toBranch("newBranch").commit();
Assert.assertNotNull(table.ops().current().ref("newBranch"));
}
}