Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ public Long endSnapshotId() {
return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional();
}

public String branch() {
return confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional();
}

public String tag() {
return confParser.stringConf().option(SparkReadOptions.TAG).parseOptional();
}

public String fileScanTaskSetId() {
return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ private SparkReadOptions() {}
// A timestamp in milliseconds; the snapshot used will be the snapshot current at this time.
public static final String AS_OF_TIMESTAMP = "as-of-timestamp";

// Branch to read from
public static final String BRANCH = "branch";

// Tag to read from
public static final String TAG = "tag";

// Overrides the table's read.split.target-size and read.split.metadata-target-size
public static final String SPLIT_SIZE = "split-size";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
private final Long startSnapshotId;
private final Long endSnapshotId;
private final Long asOfTimestamp;
private final String branch;
private final String tag;
private final List<Expression> runtimeFilterExpressions;

private Set<Integer> specIds = null; // lazy cache of scanned spec IDs
Expand All @@ -88,6 +90,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
this.asOfTimestamp = readConf.asOfTimestamp();
this.branch = readConf.branch();
this.tag = readConf.tag();
this.runtimeFilterExpressions = Lists.newArrayList();

if (scan == null) {
Expand Down Expand Up @@ -244,13 +248,22 @@ public Statistics estimateStatistics() {
Snapshot snapshot = table().snapshot(snapshotIdAsOfTime);
return estimateStatistics(snapshot);

} else if (branch != null) {
Snapshot snapshot = table().snapshot(branch);
return estimateStatistics(snapshot);

} else if (tag != null) {
Snapshot snapshot = table().snapshot(tag);
return estimateStatistics(snapshot);

} else {
Snapshot snapshot = table().currentSnapshot();
return estimateStatistics(snapshot);
}
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public boolean equals(Object o) {
if (this == o) {
return true;
Expand All @@ -269,7 +282,9 @@ && readSchema().equals(that.readSchema())
&& Objects.equals(snapshotId, that.snapshotId)
&& Objects.equals(startSnapshotId, that.startSnapshotId)
&& Objects.equals(endSnapshotId, that.endSnapshotId)
&& Objects.equals(asOfTimestamp, that.asOfTimestamp);
&& Objects.equals(asOfTimestamp, that.asOfTimestamp)
&& Objects.equals(branch, that.branch)
&& Objects.equals(tag, that.tag);
}
Copy link
Contributor Author

@namrathamyske namrathamyske Oct 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have to uncomment this , but getting a checkstyle cyclomatic complexity error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering it's required for a correct equals implementation of SparkBatchQueryScan, I think it makes the most sense just to suppress the warnings on the method @SuppressWarnings("checkstyle:CyclomaticComplexity")


@Override
Expand All @@ -282,7 +297,9 @@ public int hashCode() {
snapshotId,
startSnapshotId,
endSnapshotId,
asOfTimestamp);
asOfTimestamp,
branch,
tag);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ private Schema schemaWithMetadataColumns() {
public Scan build() {
Long snapshotId = readConf.snapshotId();
Long asOfTimestamp = readConf.asOfTimestamp();
String branch = readConf.branch();
String tag = readConf.tag();

Preconditions.checkArgument(
snapshotId == null || asOfTimestamp == null,
Expand Down Expand Up @@ -226,6 +228,14 @@ public Scan build() {
scan = scan.asOfTime(asOfTimestamp);
}

if (branch != null) {
scan = scan.useRef(branch);
}

if (tag != null) {
scan = scan.useRef(tag);
}

if (startSnapshotId != null) {
if (endSnapshotId != null) {
scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
Expand All @@ -241,10 +251,15 @@ public Scan build() {

public Scan buildChangelogScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
"Cannot set neither %s nor %s for changelogs",
readConf.snapshotId() == null
&& readConf.asOfTimestamp() == null
&& readConf.branch() == null
&& readConf.tag() == null,
"Cannot set neither %s, %s, %s and %s for changelogs",
SparkReadOptions.SNAPSHOT_ID,
SparkReadOptions.AS_OF_TIMESTAMP);
SparkReadOptions.AS_OF_TIMESTAMP,
SparkReadOptions.BRANCH,
SparkReadOptions.TAG);

Long startSnapshotId = readConf.startSnapshotId();
Long endSnapshotId = readConf.endSnapshotId();
Expand Down Expand Up @@ -273,10 +288,15 @@ public Scan buildChangelogScan() {

public Scan buildMergeOnReadScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
"Cannot set time travel options %s and %s for row-level command scans",
readConf.snapshotId() == null
&& readConf.asOfTimestamp() == null
&& readConf.branch() == null
&& readConf.tag() == null,
"Cannot set time travel options %s, %s, %s and %s for row-level command scans",
SparkReadOptions.SNAPSHOT_ID,
SparkReadOptions.AS_OF_TIMESTAMP);
SparkReadOptions.AS_OF_TIMESTAMP,
SparkReadOptions.BRANCH,
SparkReadOptions.TAG);

Preconditions.checkArgument(
readConf.startSnapshotId() == null && readConf.endSnapshotId() == null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,148 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
.hasMessageContaining("Cannot specify both snapshot-id")
.hasMessageContaining("and as-of-timestamp");
}

@Test
public void testSnapshotSelectionByTag() throws IOException {
Copy link
Contributor

@rdblue rdblue Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need tests to show that branch and tag options can't be used at the same time, and tests to validate what happens when snapshot or timestamp are set along with branch or tag. It should be easy to make a few tests for those error cases.

String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();

// produce the second snapshot
List<SimpleRecord> secondBatchRecords =
Lists.newArrayList(
new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

// verify records in the current snapshot by tag
Dataset<Row> currentSnapshotResult =
spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
List<SimpleRecord> currentSnapshotRecords =
currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, currentSnapshotRecords);
}

@Test
public void testSnapshotSelectionByBranch() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();

// produce the second snapshot
List<SimpleRecord> secondBatchRecords =
Lists.newArrayList(
new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

// verify records in the current snapshot by branch
Dataset<Row> currentSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> currentSnapshotRecords =
currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, currentSnapshotRecords);
}

@Test
public void testSnapshotSelectionByBranchAndTagFails() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();

Assertions.assertThatThrownBy(
() ->
spark
.read()
.format("iceberg")
.option(SparkReadOptions.TAG, "tag")
.option(SparkReadOptions.BRANCH, "branch")
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
}

@Test
public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

long timestamp = System.currentTimeMillis();
table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();

Assertions.assertThatThrownBy(
() ->
spark
.read()
.format("iceberg")
.option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
.option(SparkReadOptions.BRANCH, "branch")
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");

Assertions.assertThatThrownBy(
() ->
spark
.read()
.format("iceberg")
.option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
.option(SparkReadOptions.TAG, "tag")
.load(tableLocation)
.show())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
}
}