Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public interface TableScan extends Scan<TableScan, FileScanTask, CombinedScanTas
*/
TableScan asOfTime(long timestampMillis);

/**
* Create a new {@link TableScan} from this scan's configuration that will use the most recent
* snapshot as of the given snapshot ref.
*
* @param snapshotRef a snapshot Ref
* @return a new scan based on this with the given snapshot Ref
* @throws IllegalArgumentException if the snapshot cannot be found
*/
TableScan useSnapshotRef(String snapshotRef);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this this should be useRef(String branchOrTagName). The term SnapshotRef is internal and I don't think it should be exposed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to separate the useBranch and useTag APIs. As you said, refs are internal. From a Spark user perspective we also want to only expose the branch/tag terms; imo I think the same case could be applied to the API level. Also considering branches can be combined with time travel we could do a separate API for that ; although there's an argument to be made to just combine useBranch + as Of Time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I considered that as well. The problem is that the caller doesn't know whether the ref is a tag or a branch before calling the method. That's determined when we look at table metadata and we don't want to force the caller to do that.

There may be a better name than "ref" for useRef. That seems like the problem to me. Maybe we could simplify it to use? I'm not sure that's obvious enough.

@aokolnychyi, do you have any thoughts on the name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue @amogh-jahagirdar I agree that we can use a common API for tag or branch like useRef.

We have two signatures:

useRef(String refName)

useRef(String refName, Long timeStampMillis) -> will throw exception for tag type, since we cant do time travel for tag.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Aug 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this sounds reasonable. The only thing is I think if we do useRef (or if we come up with a better name) then we would not want to have the useRef(String refName, Long timeStampMillis). A user would chain it with the existing useTimestamp and then the validation that it's a branch would happen in the scan context.useRef().asOfTime() I don't think we would want the extra method because time travel would only apply for branches so having the ref in that case doesn't make sense to me since it's really only supported for 1 ref type, the branch.

If we have consensus on this, then I can update https://github.com/apache/iceberg/pull/5364/files with the updated approach. Then this, PR could be focused on the Spark side of the integration. Will wait to hear what @aokolnychyi suggests as well!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will throw exception for tag type, since we cant do time travel for tag.

In that case I would suggest:

  • useRef(String refName)
  • useBranchAsOfTime(String branchName, Long timeStampMillis)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see the alternative is just .useRef(refName).asOfTime(timestampMillis). That also works, in that case +1 for useRef(String refName)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like there is consensus for useRef.


/**
* Create a new {@link TableScan} from this that will read the given data columns. This produces
* an expected schema that includes all fields that are either selected or used by this scan's
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public TableScan useSnapshot(long scanSnapshotId) {
tableOps(), table(), tableSchema(), context().useSnapshotId(scanSnapshotId));
}

@Override
public TableScan useSnapshotRef(String snapshotRef) {
Preconditions.checkArgument(table().snapshot(snapshotRef) != null,
"Cannot find ref with name %s", snapshotRef);
return useSnapshot(table().snapshot(snapshotRef).snapshotId());
}

@Override
public TableScan asOfTime(long timestampMillis) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ public String parse() {
Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
return parse(Function.identity(), defaultValue);
}

public String parseOptional() {
return parse(Function.identity(), null);
}
}

abstract class ConfParser<ThisT, T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public Long endSnapshotId() {
return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional();
}

public String snapshotRef() {return confParser.stringConf().option(SparkReadOptions.SNAPSHOT_REF).parseOptional();}

public boolean streamingSkipDeleteSnapshots() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ private SparkReadOptions() {}
// A timestamp in milliseconds; the snapshot used will be the snapshot current at this time.
public static final String AS_OF_TIMESTAMP = "as-of-timestamp";

// Snapshot Ref of the table snapshot to read from
public static final String SNAPSHOT_REF = "snapshot-ref";

// Overrides the table's read.split.target-size and read.split.metadata-target-size
public static final String SPLIT_SIZE = "split-size";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class SparkBatchQueryScan extends SparkBatchScan {
private final Long snapshotId;
private final Long startSnapshotId;
private final Long endSnapshotId;
private final String snapshotRef;
private final Long asOfTimestamp;
private final Long splitSize;
private final Integer splitLookback;
Expand All @@ -61,22 +62,32 @@ class SparkBatchQueryScan extends SparkBatchScan {

this.snapshotId = readConf.snapshotId();
this.asOfTimestamp = readConf.asOfTimestamp();
this.snapshotRef = readConf.snapshotRef();


if (snapshotId != null && asOfTimestamp != null) {
throw new IllegalArgumentException(
"Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
} else if (snapshotId != null && snapshotRef != null) {
throw new IllegalArgumentException(
"Cannot scan using both snapshot-id and snapshot-ref to select the table snapshot");
} else if(asOfTimestamp!= null && snapshotRef != null) {
throw new IllegalArgumentException(
"Cannot scan using both as-of-timestamp and snapshot-ref to select the table snapshot");
}

this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
if (snapshotId != null || asOfTimestamp != null) {
if (snapshotId != null || asOfTimestamp != null || snapshotRef != null) {
if (startSnapshotId != null || endSnapshotId != null) {
throw new IllegalArgumentException(
"Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either "
+ SparkReadOptions.SNAPSHOT_ID
+ " or "
+ SparkReadOptions.AS_OF_TIMESTAMP
+ " is specified");
+ " or "
+ SparkReadOptions.SNAPSHOT_REF
+ " is specified ");
}
} else if (startSnapshotId == null && endSnapshotId != null) {
throw new IllegalArgumentException(
Expand All @@ -103,6 +114,10 @@ protected List<CombinedScanTask> tasks() {
scan = scan.asOfTime(asOfTimestamp);
}

if (snapshotRef != null) {
scan = scan.useSnapshotRef(snapshotRef);
}

if (startSnapshotId != null) {
if (endSnapshotId != null) {
scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,62 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
.hasMessageContaining("Cannot specify both snapshot-id")
.hasMessageContaining("and as-of-timestamp");
}

@Test
public void testSnapshotSelectionByRef() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to polish these testcases!


HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

// produce the second snapshot
List<SimpleRecord> secondBatchRecords = Lists.newArrayList(
new SimpleRecord(4, "d"),
new SimpleRecord(5, "e"),
new SimpleRecord(6, "f")
);
Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

Assert.assertEquals("Expected 2 snapshots", 2, Iterables.size(table.snapshots()));

// verify records in the current snapshot
table.manageSnapshots().createTag("firstag", table.currentSnapshot().snapshotId()).commit();
Dataset<Row> currentSnapshotResult = spark.read()
.format("iceberg")
.option("snapshot-ref", "firstag")
.load(tableLocation);
currentSnapshotResult.show();
List<SimpleRecord> currentSnapshotRecords = currentSnapshotResult.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
expectedRecords.addAll(secondBatchRecords);
Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords);

// verify records in the previous snapshot
Snapshot currentSnapshot = table.currentSnapshot();
Long parentSnapshotId = currentSnapshot.parentId();
table.manageSnapshots().createTag("secondtag", parentSnapshotId).commit();
Dataset<Row> previousSnapshotResult = spark.read()
.format("iceberg")
.option("snapshot-ref", "secondtag")
.load(tableLocation);
previousSnapshotResult.show();
List<SimpleRecord> previousSnapshotRecords = previousSnapshotResult.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
Assert.assertEquals("Previous snapshot rows should match", firstBatchRecords, previousSnapshotRecords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public Long endSnapshotId() {
return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional();
}

public String snapshotRef() { return confParser.stringConf().option(SparkReadOptions.SNAPSHOT_REF).parseOptional(); }

public String fileScanTaskSetId() {
return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ private SparkReadOptions() {}
// A timestamp in milliseconds; the snapshot used will be the snapshot current at this time.
public static final String AS_OF_TIMESTAMP = "as-of-timestamp";

// A snapshot ref name that will be used to fetch snapshot pointed by ref
public static final String SNAPSHOT_REF = "snapshot-ref";

// Overrides the table's read.split.target-size and read.split.metadata-target-size
public static final String SPLIT_SIZE = "split-size";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
private final Long startSnapshotId;
private final Long endSnapshotId;
private final Long asOfTimestamp;
private final String snapshotRef;
private final List<Expression> runtimeFilterExpressions;

private Set<Integer> specIds = null; // lazy cache of scanned spec IDs
Expand All @@ -88,6 +89,7 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
this.asOfTimestamp = readConf.asOfTimestamp();
this.snapshotRef = readConf.snapshotRef();
this.runtimeFilterExpressions = Lists.newArrayList();

if (scan == null) {
Expand Down Expand Up @@ -269,7 +271,8 @@ && readSchema().equals(that.readSchema())
&& Objects.equals(snapshotId, that.snapshotId)
&& Objects.equals(startSnapshotId, that.startSnapshotId)
&& Objects.equals(endSnapshotId, that.endSnapshotId)
&& Objects.equals(asOfTimestamp, that.asOfTimestamp);
&& Objects.equals(asOfTimestamp, that.asOfTimestamp)
&& Objects.equals(snapshotRef, that.snapshotRef);
}

@Override
Expand All @@ -282,7 +285,8 @@ public int hashCode() {
snapshotId,
startSnapshotId,
endSnapshotId,
asOfTimestamp);
asOfTimestamp,
snapshotRef);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,29 @@ private Schema schemaWithMetadataColumns() {
public Scan build() {
Long snapshotId = readConf.snapshotId();
Long asOfTimestamp = readConf.asOfTimestamp();
String snapshotRef = readConf.snapshotRef();

Preconditions.checkArgument(
snapshotId == null || asOfTimestamp == null,
"Cannot set both %s and %s to select which table snapshot to scan",
SparkReadOptions.SNAPSHOT_ID,
SparkReadOptions.AS_OF_TIMESTAMP);
Preconditions.checkArgument(
snapshotId == null || snapshotRef == null,
"Cannot set both %s and %s to select which table snapshot to scan",
SparkReadOptions.SNAPSHOT_ID,
SparkReadOptions.SNAPSHOT_REF);
Preconditions.checkArgument(
asOfTimestamp == null || snapshotRef == null,
"Cannot set both %s and %s to select which table snapshot to scan",
SparkReadOptions.AS_OF_TIMESTAMP,
SparkReadOptions.SNAPSHOT_REF);


Long startSnapshotId = readConf.startSnapshotId();
Long endSnapshotId = readConf.endSnapshotId();

if (snapshotId != null || asOfTimestamp != null) {
if (snapshotId != null || asOfTimestamp != null || snapshotRef != null) {
Preconditions.checkArgument(
startSnapshotId == null && endSnapshotId == null,
"Cannot set %s and %s for incremental scans when either %s or %s is set",
Expand Down Expand Up @@ -225,6 +237,10 @@ public Scan build() {
scan = scan.asOfTime(asOfTimestamp);
}

if (snapshotRef != null) {
scan = scan.useSnapshotRef(snapshotRef);
}

if (startSnapshotId != null) {
if (endSnapshotId != null) {
scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,62 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
.hasMessageContaining("Cannot specify both snapshot-id")
.hasMessageContaining("and as-of-timestamp");
}

@Test
public void testSnapshotSelectionByRef() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to polish these testcases!


HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

// produce the second snapshot
List<SimpleRecord> secondBatchRecords = Lists.newArrayList(
new SimpleRecord(4, "d"),
new SimpleRecord(5, "e"),
new SimpleRecord(6, "f")
);
Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

Assert.assertEquals("Expected 2 snapshots", 2, Iterables.size(table.snapshots()));

// verify records in the current snapshot
table.manageSnapshots().createTag("firstag", table.currentSnapshot().snapshotId()).commit();
Dataset<Row> currentSnapshotResult = spark.read()
.format("iceberg")
.option("snapshot-ref", "firstag")
.load(tableLocation);
currentSnapshotResult.show();
List<SimpleRecord> currentSnapshotRecords = currentSnapshotResult.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
expectedRecords.addAll(secondBatchRecords);
Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords);

// verify records in the previous snapshot
Snapshot currentSnapshot = table.currentSnapshot();
Long parentSnapshotId = currentSnapshot.parentId();
table.manageSnapshots().createTag("secondtag", parentSnapshotId).commit();
Dataset<Row> previousSnapshotResult = spark.read()
.format("iceberg")
.option("snapshot-ref", "secondtag")
.load(tableLocation);
previousSnapshotResult.show();
List<SimpleRecord> previousSnapshotRecords = previousSnapshotResult.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
Assert.assertEquals("Previous snapshot rows should match", firstBatchRecords, previousSnapshotRecords);
}
}