diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 1368c26792ee..54d8dd01b92e 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -235,23 +235,30 @@ public void testVersionAsOf() { } @Test - public void testTagReferenceAsOf() { + public void testTagReference() { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); table.manageSnapshots().createTag("test_tag", snapshotId).commit(); - - // create a second snapshot, read the table at the snapshot List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot, read the table at the tag sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); List actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName); assertEquals("Snapshot at specific tag reference name", expected, actual1); - // read the table at the snapshot + // read the table at the tag // HIVE time travel syntax List actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName); assertEquals("Snapshot at specific tag reference name", expected, actual2); - // read the table using DataFrameReader option: branch + // Spark session catalog does not support extended table names + if (!"spark_catalog".equals(catalogName)) { + // read the table using the "tag_" prefix in the table name + List actual3 = sql("SELECT * FROM %s.tag_test_tag", tableName); + assertEquals("Snapshot at specific tag reference name, prefix", expected, actual3); + } + + // read the table using DataFrameReader option: tag Dataset df = spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName); List fromDF = rowsToJava(df.collectAsList()); @@ -284,23 +291,30 @@ public void testUseSnapshotIdForTagReferenceAsOf() { } @Test - public void testBranchReferenceAsOf() { + public void testBranchReference() { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); table.manageSnapshots().createBranch("test_branch", snapshotId).commit(); - - // create a second snapshot, read the table at the snapshot List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot, read the table at the branch sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); List actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName); assertEquals("Snapshot at specific branch reference name", expected, actual1); - // read the table at the snapshot + // read the table at the branch // HIVE time travel syntax List actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName); assertEquals("Snapshot at specific branch reference name", expected, actual2); + // Spark session catalog does not support extended table names + if (!"spark_catalog".equals(catalogName)) { + // read the table using the "branch_" prefix in the table name + List actual3 = sql("SELECT * FROM %s.branch_test_branch", tableName); + assertEquals("Snapshot at specific branch reference name, prefix", expected, actual3); + } + // read the table using DataFrameReader option: branch Dataset df = spark diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 161c2e0ba637..93bb21b41a2b 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -234,23 +234,30 @@ public void testVersionAsOf() { } @Test - public void testTagReferenceAsOf() { + public void testTagReference() { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); table.manageSnapshots().createTag("test_tag", snapshotId).commit(); - - // create a second snapshot, read the table at the snapshot List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot, read the table at the tag sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); List actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName); assertEquals("Snapshot at specific tag reference name", expected, actual1); - // read the table at the snapshot + // read the table at the tag // HIVE time travel syntax List actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName); assertEquals("Snapshot at specific tag reference name", expected, actual2); - // read the table using DataFrameReader option: branch + // Spark session catalog does not support extended table names + if (!"spark_catalog".equals(catalogName)) { + // read the table using the "tag_" prefix in the table name + List actual3 = sql("SELECT * FROM %s.tag_test_tag", tableName); + assertEquals("Snapshot at specific tag reference name, prefix", expected, actual3); + } + + // read the table using DataFrameReader option: tag Dataset df = spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName); List fromDF = rowsToJava(df.collectAsList()); @@ -283,23 +290,30 @@ public void testUseSnapshotIdForTagReferenceAsOf() { } @Test - public void testBranchReferenceAsOf() { + public void testBranchReference() { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); table.manageSnapshots().createBranch("test_branch", snapshotId).commit(); - - // create a second snapshot, read the table at the snapshot List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot, read the table at the branch sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); List actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName); assertEquals("Snapshot at specific branch reference name", expected, actual1); - // read the table at the snapshot + // read the table at the branch // HIVE time travel syntax List actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName); assertEquals("Snapshot at specific branch reference name", expected, actual2); + // Spark session catalog does not support extended table names + if (!"spark_catalog".equals(catalogName)) { + // read the table using the "branch_" prefix in the table name + List actual3 = sql("SELECT * FROM %s.branch_test_branch", tableName); + assertEquals("Snapshot at specific branch reference name, prefix", expected, actual3); + } + // read the table using DataFrameReader option: branch Dataset df = spark diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 161c2e0ba637..93bb21b41a2b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -234,23 +234,30 @@ public void testVersionAsOf() { } @Test - public void testTagReferenceAsOf() { + public void testTagReference() { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); table.manageSnapshots().createTag("test_tag", snapshotId).commit(); - - // create a second snapshot, read the table at the snapshot List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot, read the table at the tag sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); List actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName); assertEquals("Snapshot at specific tag reference name", expected, actual1); - // read the table at the snapshot + // read the table at the tag // HIVE time travel syntax List actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName); assertEquals("Snapshot at specific tag reference name", expected, actual2); - // read the table using DataFrameReader option: branch + // Spark session catalog does not support extended table names + if (!"spark_catalog".equals(catalogName)) { + // read the table using the "tag_" prefix in the table name + List actual3 = sql("SELECT * FROM %s.tag_test_tag", tableName); + assertEquals("Snapshot at specific tag reference name, prefix", expected, actual3); + } + + // read the table using DataFrameReader option: tag Dataset df = spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName); List fromDF = rowsToJava(df.collectAsList()); @@ -283,23 +290,30 @@ public void testUseSnapshotIdForTagReferenceAsOf() { } @Test - public void testBranchReferenceAsOf() { + public void testBranchReference() { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); table.manageSnapshots().createBranch("test_branch", snapshotId).commit(); - - // create a second snapshot, read the table at the snapshot List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot, read the table at the branch sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); List actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName); assertEquals("Snapshot at specific branch reference name", expected, actual1); - // read the table at the snapshot + // read the table at the branch // HIVE time travel syntax List actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName); assertEquals("Snapshot at specific branch reference name", expected, actual2); + // Spark session catalog does not support extended table names + if (!"spark_catalog".equals(catalogName)) { + // read the table using the "branch_" prefix in the table name + List actual3 = sql("SELECT * FROM %s.branch_test_branch", tableName); + assertEquals("Snapshot at specific branch reference name, prefix", expected, actual3); + } + // read the table using DataFrameReader option: branch Dataset df = spark