diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 84be79463a1d..9714ec3f4a88 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -37,6 +37,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -159,7 +160,15 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep sparkTable.snapshotId() == null, "Cannot do time-travel based on both table identifier and AS OF"); - return sparkTable.copyWithSnapshotId(Long.parseLong(version)); + try { + return sparkTable.copyWithSnapshotId(Long.parseLong(version)); + } catch (NumberFormatException e) { + SnapshotRef ref = sparkTable.table().refs().get(version); + ValidationException.check( + ref != null, + "Cannot find matching snapshot ID or reference name for version " + version); + return sparkTable.copyWithSnapshotId(ref.snapshotId()); + } } else if (table instanceof SparkChangelogTable) { throw new UnsupportedOperationException("AS OF is not supported for changelogs"); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 360deecbfd9b..9f074c6f9b61 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -24,8 +24,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Table; import org.apache.iceberg.events.Listeners; import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.Spark3Util; @@ -33,6 +35,7 @@ import org.apache.iceberg.spark.SparkReadOptions; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -231,6 +234,92 @@ public void testVersionAsOf() { assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF); } + @Test + public void testTagReferenceAsOf() { + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag("test_tag", snapshotId).commit(); + + // create a second snapshot, read the table at the snapshot + List expected = sql("SELECT * FROM %s", tableName); + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + List actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName); + assertEquals("Snapshot at specific tag reference name", expected, actual1); + + // read the table at the snapshot + // HIVE time travel syntax + List actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName); + assertEquals("Snapshot at specific tag reference name", expected, actual2); + + // read the table using DataFrameReader option: branch + Dataset df = + spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName); + List fromDF = rowsToJava(df.collectAsList()); + assertEquals("Snapshot at specific tag reference name", expected, fromDF); + } + + @Test + public void testUseSnapshotIdForTagReferenceAsOf() { + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId1 = table.currentSnapshot().snapshotId(); + + // create a second snapshot, read the table at the snapshot + List actual = sql("SELECT * FROM %s", tableName); + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + table.refresh(); + long snapshotId2 = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag(Long.toString(snapshotId1), snapshotId2).commit(); + + // currently Spark version travel ignores the type of the AS OF + // this means if a tag name matches a snapshot ID, it will always choose snapshotID to travel + // to. + List travelWithStringResult = + sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, snapshotId1); + assertEquals("Snapshot at specific tag reference name", actual, travelWithStringResult); + + List travelWithLongResult = + sql("SELECT * FROM %s VERSION AS OF %s", tableName, snapshotId1); + assertEquals("Snapshot at specific tag reference name", actual, travelWithLongResult); + } + + @Test + public void testBranchReferenceAsOf() { + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch("test_branch", snapshotId).commit(); + + // create a second snapshot, read the table at the snapshot + List expected = sql("SELECT * FROM %s", tableName); + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + List actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName); + assertEquals("Snapshot at specific branch reference name", expected, actual1); + + // read the table at the snapshot + // HIVE time travel syntax + List actual2 = + sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName); + assertEquals("Snapshot at specific branch reference name", expected, actual2); + + // read the table using DataFrameReader option: branch + Dataset df = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.BRANCH, "test_branch") + .load(tableName); + List fromDF = rowsToJava(df.collectAsList()); + assertEquals("Snapshot at specific branch reference name", expected, fromDF); + } + + @Test + public void testUnknownReferenceAsOf() { + Assertions.assertThatThrownBy( + () -> sql("SELECT * FROM %s VERSION AS OF 'test_unknown'", tableName)) + .hasMessageContaining("Cannot find matching snapshot ID or reference name for version") + .isInstanceOf(ValidationException.class); + } + @Test public void testTimestampAsOf() { long snapshotTs = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis();