Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {

/**
* Perform operations on a particular branch
*
* @param branch which is name of SnapshotRef of type branch.
*/
ThisT toBranch(String branch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public OverwriteFiles validateNoConflictingDeletes() {
}

@Override
protected void validate(TableMetadata base) {
protected void validate(TableMetadata base, Snapshot snapshot) {
if (validateAddedFilesMatchOverwriteFilter) {
PartitionSpec spec = dataSpec();
Expression rowFilter = rowFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ReplacePartitions validateNoConflictingData() {
}

@Override
public void validate(TableMetadata currentMetadata) {
public void validate(TableMetadata currentMetadata, Snapshot snapshot) {
if (validateConflictingData) {
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
Expand All @@ -101,14 +101,14 @@ public void validate(TableMetadata currentMetadata) {
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
if (dataSpec().fields().size() <= 0) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
}

try {
return super.apply(base);
return super.apply(base, snapshot);
} catch (ManifestFilterManager.DeleteException e) {
throw new ValidationException(
"Cannot commit file that conflicts with existing partition: %s", e.partition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public RewriteFiles validateFromSnapshot(long snapshotId) {
}

@Override
protected void validate(TableMetadata base) {
protected void validate(TableMetadata base, Snapshot snapshot) {
if (replacedDataFiles.size() > 0) {
// if there are replaced data files, there cannot be any new row-level deletes for those data
// files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private ManifestFile copyManifest(ManifestFile manifest) {
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> currentManifests = base.currentSnapshot().dataManifests(ops.io());
Set<ManifestFile> currentManifestSet = ImmutableSet.copyOf(currentManifests);

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public RowDelta validateNoConflictingDeleteFiles() {
}

@Override
protected void validate(TableMetadata base) {
protected void validate(TableMetadata base, Snapshot snapshot) {
if (base.currentSnapshot() != null) {
if (!referencedDataFiles.isEmpty()) {
validateDataFilesExist(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Object updateEvent() {
}

@Override
protected void validate(TableMetadata base) {
protected void validate(TableMetadata base, Snapshot snapshot) {
// this is only called after apply() passes off to super, but check fast-forward status just in
// case
if (!isFastForward(base)) {
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ public FastAppend appendFile(DataFile file) {
return this;
}

@Override
public FastAppend toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
public FastAppend appendManifest(ManifestFile manifest) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -135,7 +141,7 @@ private ManifestFile copyManifest(ManifestFile manifest) {
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> newManifests = Lists.newArrayList();

try {
Expand All @@ -153,8 +159,8 @@ public List<ManifestFile> apply(TableMetadata base) {
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
Iterables.addAll(newManifests, appendManifestsWithMetadata);

if (base.currentSnapshot() != null) {
newManifests.addAll(base.currentSnapshot().allManifests(ops.io()));
if (snapshot != null) {
newManifests.addAll(snapshot.allManifests(ops.io()));
}

return newManifests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,13 +758,11 @@ protected Map<String, String> summary() {
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
Snapshot current = base.currentSnapshot();

public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
// filter any existing manifests
List<ManifestFile> filtered =
filterManager.filterManifests(
base.schema(), current != null ? current.dataManifests(ops.io()) : null);
base.schema(), snapshot != null ? snapshot.dataManifests(ops.io()) : null);
long minDataSequenceNumber =
filtered.stream()
.map(ManifestFile::minSequenceNumber)
Expand All @@ -777,7 +775,7 @@ public List<ManifestFile> apply(TableMetadata base) {
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
base.schema(), current != null ? current.deleteManifests(ops.io()) : null);
base.schema(), snapshot != null ? snapshot.deleteManifests(ops.io()) : null);

// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep =
Expand Down
85 changes: 43 additions & 42 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,36 +116,26 @@ public ThisT scanManifestsWith(ExecutorService executorService) {

@Override
public ThisT toBranch(String branch) {
throw new UnsupportedOperationException("Performing operations on a branch is currently not supported");
throw new UnsupportedOperationException(
String.format(
"Cannot commit to branch %s: %s does not support branch commits",
branch, this.getClass().getName()));
}

/***
* Will be used by snapshot producer operations to create a new ref if an invalid branch is passed
* @param branch ref name on which operation is to performed
*/
protected void createNewRef(String branch) {
SnapshotRef branchRef = SnapshotRef.branchBuilder(this.current().currentSnapshot().snapshotId()).build();
TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(this.current());
updatedBuilder.setRef(branch, branchRef);
ops.commit(ops.current(), updatedBuilder.build());
}

/***
* A setter for the target branch on which snapshot producer operation should be performed
/**
* * A setter for the target branch on which snapshot producer operation should be performed
*
* @param branch to set as target branch
*/
protected void setTargetBranch(String branch) {
protected void targetBranch(String branch) {
Preconditions.checkArgument(branch != null, "Invalid branch name: null");
boolean refExists = base.ref(branch) != null;
Preconditions.checkArgument(
!refExists || base.ref(branch).isBranch(),
"%s is a tag, not a branch. Tags cannot be targets for producing snapshots");
this.targetBranch = branch;
}

/***
* A getter for the target branch on which snapshot producer operation should be performed
* @return target branch
*/
protected String getTargetBranch() {
return targetBranch;
}

protected ExecutorService workerPool() {
return this.workerPool;
}
Expand Down Expand Up @@ -183,27 +173,37 @@ public ThisT deleteWith(Consumer<String> deleteCallback) {
* <p>Child operations can override this to add custom validation.
*
* @param currentMetadata current table metadata to validate
* @param snapshot ending snapshot on the lineage which is being validated
*/
protected void validate(TableMetadata currentMetadata) {}
protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}

/**
* Apply the update's changes to the base table metadata and return the new manifest list.
*
* @param metadataToUpdate the base table metadata to apply changes to
* @param snapshot
* @return a manifest list for the new snapshot.
*/
protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate);
protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate, Snapshot snapshot);

@Override
public Snapshot apply() {
refresh();
Long parentSnapshotId = base.ref(targetBranch) != null ? base.ref(targetBranch).snapshotId() : null;
long sequenceNumber = base.nextSequenceNumber();
Snapshot parentSnapshot = base.currentSnapshot();
if (targetBranch != null) {
SnapshotRef branch = base.ref(targetBranch);
if (branch != null) {
parentSnapshot = base.snapshot(branch.snapshotId());
} else if (base.currentSnapshot() != null) {
parentSnapshot = base.currentSnapshot();
}
}

// run validations from the child operation
validate(base);
long sequenceNumber = base.nextSequenceNumber();
Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId();

List<ManifestFile> manifests = apply(base);
validate(base, parentSnapshot);
List<ManifestFile> manifests = apply(base, parentSnapshot);

if (base.formatVersion() > 1
|| base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
Expand Down Expand Up @@ -362,18 +362,19 @@ public void commit() {
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch);
} else if (stageOnly) {
update.addSnapshot(newSnapshot);
} else {
update.setBranchSnapshot(newSnapshot, targetBranch);
}
.run(
taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch);
} else if (stageOnly) {
update.addSnapshot(newSnapshot);
} else {
update.setBranchSnapshot(newSnapshot, targetBranch);
}

TableMetadata updated = update.build();
if (updated.changes().isEmpty()) {
Expand Down
55 changes: 48 additions & 7 deletions core/src/test/java/org/apache/iceberg/TestFastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,53 @@ public void testIncludedPartitionSummaryLimit() {
Assert.assertEquals("Should set changed partition count", "2", changedPartitions);
}

@Test(expected = UnsupportedOperationException.class)
public void testAppendToBranch() throws UnsupportedOperationException {
table.newFastAppend()
.appendFile(FILE_A)
.commit();
table.manageSnapshots().createBranch("ref", table.currentSnapshot().snapshotId()).commit();
table.newFastAppend().appendFile(FILE_B).toBranch("ref").commit();
@Test
public void testAppendToExistingBranch() {
table.newFastAppend().appendFile(FILE_A).commit();
table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
int branchSnapshot = 2;

Assert.assertEquals(table.currentSnapshot().snapshotId(), 1);
Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
}

@Test
public void testAppendCreatesBranchIfNeeded() {
table.newFastAppend().appendFile(FILE_A).commit();
table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
int branchSnapshot = 2;

Assert.assertEquals(table.currentSnapshot().snapshotId(), 1);
Assert.assertNotNull(table.ops().current().ref("branch"));
Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
}

@Test
public void testAppendToBranchEmptyTable() {
table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
int branchSnapshot = 1;

Assert.assertNull(table.currentSnapshot());
Assert.assertNotNull(table.ops().current().ref("branch"));
Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
}

@Test
public void testAppendToNullBranchFails() {
AssertHelpers.assertThrows(
"Invalid branch",
IllegalArgumentException.class,
() -> table.newFastAppend().appendFile(FILE_A).toBranch(null));
}

@Test
public void testAppendToTagFails() {
table.newFastAppend().appendFile(FILE_A).commit();
table.manageSnapshots().createTag("some-tag", table.currentSnapshot().snapshotId()).commit();
AssertHelpers.assertThrows(
"Invalid branch",
IllegalArgumentException.class,
() -> table.newFastAppend().appendFile(FILE_A).toBranch("some-tag").commit());
}
}
13 changes: 13 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestOverwrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,17 @@ public void testValidatedOverwriteWithAppendSuccess() {
Assert.assertEquals(
"Should not create a new snapshot", baseId, table.currentSnapshot().snapshotId());
}

@Test
public void testOverwriteToBranchUnsupported() {
AssertHelpers.assertThrows(
"Cannot commit to branch someBranch: org.apache.iceberg.BaseOverwriteFiles does not support branch commits",
UnsupportedOperationException.class,
() ->
table
.newOverwrite()
.overwriteByRowFilter(and(equal("date", "2018-06-09"), lessThan("id", 20)))
.addFile(FILE_10_TO_14)
.toBranch("someBranch"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -742,4 +742,13 @@ public void testValidateOnlyDeletes() {
public void testEmptyPartitionPathWithUnpartitionedTable() {
DataFiles.builder(PartitionSpec.unpartitioned()).withPartitionPath("");
}

@Test
public void testReplacePartitionsOnBranchUnsupported() {
AssertHelpers.assertThrows(
"Should reject committing rewrite manifests to branch",
UnsupportedOperationException.class,
"Cannot commit to branch someBranch: org.apache.iceberg.BaseReplacePartitions does not support branch commits",
() -> table.newReplacePartitions().addFile(FILE_UNPARTITIONED_A).toBranch("someBranch"));
}
}
14 changes: 14 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,20 @@ public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOE
Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists());
}

@Test
public void testRewriteManifestsOnBranchUnsupported() {

table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();

Assert.assertEquals(1, table.currentSnapshot().allManifests(table.io()).size());

AssertHelpers.assertThrows(
"Should reject committing rewrite manifests to branch",
UnsupportedOperationException.class,
"Cannot commit to branch someBranch: org.apache.iceberg.BaseRewriteManifests does not support branch commits",
() -> table.rewriteManifests().toBranch("someBranch").commit());
}

private void validateSummary(
Snapshot snapshot, int replaced, int kept, int created, int entryCount) {
Map<String, String> summary = snapshot.summary();
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -1412,4 +1412,20 @@ public void testRowDeltaCaseSensitivity() {
.validateNoConflictingDeleteFiles()
.commit());
}

@Test
public void testRowDeltaToBranchUnsupported() {
AssertHelpers.assertThrows(
"Should reject committing row delta to branch",
UnsupportedOperationException.class,
"Cannot commit to branch someBranch: org.apache.iceberg.BaseRowDelta does not support branch commits",
() ->
table
.newRowDelta()
.caseSensitive(false)
.addRows(FILE_B)
.addDeletes(FILE_A2_DELETES)
.toBranch("someBranch")
.commit());
}
}