Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method org.apache.iceberg.RowDelta org.apache.iceberg.RowDelta::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)"
justification: "Deprecations for 1.0 release"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)"
justification: "Adding table scan APIs to support scanning from refs"
release-base-0.13.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
16 changes: 14 additions & 2 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,25 @@ public interface TableScan extends Scan<TableScan, FileScanTask, CombinedScanTas
*/
TableScan useSnapshot(long snapshotId);

/**
* Create a new {@link TableScan} from this scan's configuration that will use the given
* reference.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use the snapshot ID of the given reference?

*
* @param ref reference
* @return a new scan based on the given reference.
* @throws IllegalArgumentException if a reference with the given name could not be found
*/
TableScan useRef(String ref);

/**
* Create a new {@link TableScan} from this scan's configuration that will use the most recent
* snapshot as of the given time in milliseconds.
* snapshot as of the given time in milliseconds on the branch in the scan or main if no branch is
* set.
*
* @param timestampMillis a timestamp in milliseconds.
* @return a new scan based on this with the current snapshot at the given time
* @throws IllegalArgumentException if the snapshot cannot be found
* @throws IllegalArgumentException if the snapshot cannot be found or time travel is attempted on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the doc change here is no longer needed?

* a tag
*/
TableScan asOfTime(long timestampMillis);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public TableScan useSnapshot(long scanSnapshotId) {
throw new UnsupportedOperationException("Cannot select snapshot in table: " + tableType());
}

@Override
public TableScan useRef(String ref) {
throw new UnsupportedOperationException("Cannot select ref in table: " + tableType());
}

@Override
public TableScan asOfTime(long timestampMillis) {
throw new UnsupportedOperationException("Cannot select snapshot in table: " + tableType());
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TableScan appendsAfter(long fromSnapshotId) {
@Override
public TableScan useSnapshot(long scanSnapshotId) {
Preconditions.checkArgument(
snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId());
snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "set to snapshot id=%s"?

Preconditions.checkArgument(
tableOps().current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s",
Expand All @@ -96,10 +96,20 @@ public TableScan useSnapshot(long scanSnapshotId) {
tableOps(), table(), tableSchema(), context().useSnapshotId(scanSnapshotId));
}

@Override
public TableScan useRef(String name) {
Preconditions.checkArgument(
snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
Snapshot snapshot = table().snapshot(name);
Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
return newRefinedScan(
tableOps(), table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId()));
}

@Override
public TableScan asOfTime(long timestampMillis) {
Preconditions.checkArgument(
snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId());
snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());

return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public TableScan asOfTime(long timestampMillis) {
timestampMillis, context().fromSnapshotId(), context().toSnapshotId()));
}

@Override
public TableScan useRef(String ref) {
throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine for now, but we may have a case where we want incremental to be able to specify a branch. It doesn't matter currently because the snapshot IDs are always explicit.

String.format(
"Cannot scan table using ref %s: configured for incremental data in snapshots (%s, %s]",
ref, context().fromSnapshotId(), context().toSnapshotId()));
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
throw new UnsupportedOperationException(
Expand Down
105 changes: 105 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
package org.apache.iceberg;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -86,4 +90,105 @@ protected DeleteFile newDeleteFile(String partitionPath) {
.withRecordCount(10)
.build();
}

@Test
public void testScanFromBranchTip() throws IOException {
table.newFastAppend().appendFile(FILE_A).commit();
// Add B and C to new branch
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).toBranch("testBranch").commit();
// Add D to main
table.newFastAppend().appendFile(FILE_D).commit();

TableScan testBranchScan = table.newScan().useRef("testBranch");
validateExpectedFileScanTasks(
testBranchScan, ImmutableList.of(FILE_A.path(), FILE_B.path(), FILE_C.path()));

TableScan mainScan = table.newScan();
validateExpectedFileScanTasks(mainScan, ImmutableList.of(FILE_A.path(), FILE_D.path()));
}

@Test
public void testScanFromTag() throws IOException {
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
table.newFastAppend().appendFile(FILE_C).commit();
TableScan tagScan = table.newScan().useRef("tagB");
validateExpectedFileScanTasks(tagScan, ImmutableList.of(FILE_A.path(), FILE_B.path()));
TableScan mainScan = table.newScan();
validateExpectedFileScanTasks(
mainScan, ImmutableList.of(FILE_A.path(), FILE_B.path(), FILE_C.path()));
}

@Test
public void testScanFromRefWhenSnapshotSetFails() {
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();

AssertHelpers.assertThrows(
"Should throw when attempting to use a ref for scanning when a snapshot is set",
IllegalArgumentException.class,
"Cannot override ref, already set snapshot id=1",
() -> table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).useRef("tagB"));
}

@Test
public void testSettingSnapshotWhenRefSetFails() {
table.newFastAppend().appendFile(FILE_A).commit();
Snapshot snapshotA = table.currentSnapshot();
table.newFastAppend().appendFile(FILE_B).commit();
table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();

AssertHelpers.assertThrows(
"Should throw when attempting to use a snapshot for scanning when a ref is set",
IllegalArgumentException.class,
"Cannot override snapshot, already set snapshot id=2",
() -> table.newScan().useRef("tagB").useSnapshot(snapshotA.snapshotId()));
}

@Test
public void testBranchTimeTravelFails() {
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
table
.manageSnapshots()
.createBranch("testBranch", table.currentSnapshot().snapshotId())
.commit();
AssertHelpers.assertThrows(
"Should throw when attempting to use a snapshot for scanning when a ref is set",
IllegalArgumentException.class,
"Cannot override snapshot, already set snapshot id=1",
() -> table.newScan().useRef("testBranch").asOfTime(System.currentTimeMillis()));
}

@Test
public void testSettingMultipleRefsFails() {
table.newFastAppend().appendFile(FILE_A).commit();
table.manageSnapshots().createTag("tagA", table.currentSnapshot().snapshotId()).commit();
table.newFastAppend().appendFile(FILE_B).commit();
table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();

AssertHelpers.assertThrows(
"Should throw when attempting to use multiple refs",
IllegalArgumentException.class,
"Cannot override ref, already set snapshot id=2",
() -> table.newScan().useRef("tagB").useRef("tagA"));
}

@Test
public void testSettingInvalidRefFails() {
AssertHelpers.assertThrows(
"Should throw when attempting to use an invalid ref for scanning",
IllegalArgumentException.class,
"Cannot find ref nonexisting",
() -> table.newScan().useRef("nonexisting"));
}

private void validateExpectedFileScanTasks(
TableScan scan, List<CharSequence> expectedFileScanPaths) throws IOException {
try (CloseableIterable<FileScanTask> scanTasks = scan.planFiles()) {
Assert.assertEquals(expectedFileScanPaths.size(), Iterables.size(scanTasks));
List<CharSequence> actualFiles = Lists.newArrayList();
scanTasks.forEach(task -> actualFiles.add(task.file().path()));
Assert.assertTrue(actualFiles.containsAll(expectedFileScanPaths));
}
}
}