Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand Down Expand Up @@ -159,7 +160,15 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep
sparkTable.snapshotId() == null,
"Cannot do time-travel based on both table identifier and AS OF");

return sparkTable.copyWithSnapshotId(Long.parseLong(version));
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this can come up, but do we allow version tags to be SnapshotIds?

Like can I tag snapshot 2 to be known as 1?

Weird edge case so I don't think we really need to handle it, just thinking if this is a potential issue with the lookup code here

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently there's no restrictions on what references can be named. For the lookup code, I think we should always be able to differentiate between snapshot ID and ref since for refs it will be in a quoted identifier, and should always fail the Long.parseLong() with a NumberParseException. So the current implementation seems good to me.

But that's just me reading the code :), I think it's worth having a unit test just for this case to give us that confidence that it works as expected in this scenario. cc @jackye1995 let me know your thoughts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test case specifically for this. Unlike Trino, Spark directly ignores the type of the VERSION AS OF, so if a tag name matches exactly the snapshot ID, then snapshot ID is always chosen.

I think this is a okay limitation, because people can work around it by adding some text like snapshot-123456890 as the tag name. But we should make it very clear in documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't want this to be a blocker, just something to take note of.

return sparkTable.copyWithSnapshotId(Long.parseLong(version));
} catch (NumberFormatException e) {
SnapshotRef ref = sparkTable.table().refs().get(version);
ValidationException.check(
ref != null,
"Cannot find matching snapshot ID or reference name for version " + version);
return sparkTable.copyWithSnapshotId(ref.snapshotId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It always use the latest commit from the reference.
I think we also need to provide a way to time travel to a snapshot within a branch/tag?

So along with existing of FOR SYSTEM_VERSION AS OF snapshotId
we should have FOR SYSTEM_VERSION AS OF snapshotId@refName

But whether to use '@' or some other syntax is an open point for a long time which @rdblue wanted to conclude.

Nessie SQL syntax for reference:
https://projectnessie.org/tools/sql/#grammar

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind.
After thinking a bit more about it and reviewing #6573, As the snapshot log contains all the snapshots from all the branches/tags. If we want to use any particular snapshot, we can directly use snapshot-id without specifying branch/tag information. So, no need of snapshotId@refName syntax

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one thing that might be useful is to time travel in a branch, something like FOR SYSTEM_VERSION AS OF branchName@123456789012. But that feels very hacky, I' rather have some syntax as we have been suggesting like SELECT * FROM table BRANCH branch FOR SYSTEM_TIME AS OF xxx. So I am leaving that part out of the implementation for now. At least I think most people can agree that a tag/branch head can be viewed as a version to travel to.

}

} else if (table instanceof SparkChangelogTable) {
throw new UnsupportedOperationException("AS OF is not supported for changelogs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Table;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -231,6 +234,92 @@ public void testVersionAsOf() {
assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF);
}

@Test
public void testTagReferenceAsOf() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag("test_tag", snapshotId).commit();

// create a second snapshot, read the table at the snapshot
List<Object[]> expected = sql("SELECT * FROM %s", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual1);

// read the table at the snapshot
// HIVE time travel syntax
List<Object[]> actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual2);

// read the table using DataFrameReader option: branch
Dataset<Row> df =
spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName);
List<Object[]> fromDF = rowsToJava(df.collectAsList());
assertEquals("Snapshot at specific tag reference name", expected, fromDF);
}

@Test
public void testUseSnapshotIdForTagReferenceAsOf() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId1 = table.currentSnapshot().snapshotId();

// create a second snapshot, read the table at the snapshot
List<Object[]> actual = sql("SELECT * FROM %s", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

table.refresh();
long snapshotId2 = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(Long.toString(snapshotId1), snapshotId2).commit();

// currently Spark version travel ignores the type of the AS OF
// this means if a tag name matches a snapshot ID, it will always choose snapshotID to travel
// to.
List<Object[]> travelWithStringResult =
sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, snapshotId1);
assertEquals("Snapshot at specific tag reference name", actual, travelWithStringResult);

List<Object[]> travelWithLongResult =
sql("SELECT * FROM %s VERSION AS OF %s", tableName, snapshotId1);
assertEquals("Snapshot at specific tag reference name", actual, travelWithLongResult);
}

@Test
public void testBranchReferenceAsOf() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("test_branch", snapshotId).commit();

// create a second snapshot, read the table at the snapshot
List<Object[]> expected = sql("SELECT * FROM %s", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual1);

// read the table at the snapshot
// HIVE time travel syntax
List<Object[]> actual2 =
sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual2);

// read the table using DataFrameReader option: branch
Dataset<Row> df =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.BRANCH, "test_branch")
.load(tableName);
List<Object[]> fromDF = rowsToJava(df.collectAsList());
assertEquals("Snapshot at specific branch reference name", expected, fromDF);
}

@Test
public void testUnknownReferenceAsOf() {
Assertions.assertThatThrownBy(
() -> sql("SELECT * FROM %s VERSION AS OF 'test_unknown'", tableName))
.hasMessageContaining("Cannot find matching snapshot ID or reference name for version")
.isInstanceOf(ValidationException.class);
}

@Test
public void testTimestampAsOf() {
long snapshotTs = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis();
Expand Down