From 0e2e789a71b5682f2d542ecc650e148821eebf6d Mon Sep 17 00:00:00 2001 From: Konrad Dziedzic Date: Tue, 14 Jun 2022 01:15:49 +0200 Subject: [PATCH] Add tests for AS OF syntax for tables with redirects --- .../iceberg/BaseSharedMetastoreTest.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseSharedMetastoreTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseSharedMetastoreTest.java index 0037de7eb54a..18d5703fcbc8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseSharedMetastoreTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseSharedMetastoreTest.java @@ -16,7 +16,13 @@ import io.trino.testing.AbstractTestQueryFramework; import org.testng.annotations.Test; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; @@ -128,4 +134,65 @@ public void testShowSchemas() showCreateIcebergWithRedirectionsSchema, getExpectedIcebergCreateSchema("iceberg_with_redirections")); } + + @Test + public void testTimeTravelWithRedirection() + throws InterruptedException + { + String testLocalSchema = "test_schema_" + randomTableSuffix(); + try { + assertUpdate("CREATE SCHEMA iceberg. " + testLocalSchema); + assertUpdate(format("CREATE TABLE iceberg.%s.nation_test AS SELECT * FROM nation", testLocalSchema), 25); + assertQuery("SELECT * FROM hive_with_redirections." + testLocalSchema + ".nation_test", "SELECT * FROM nation"); + long snapshot1 = getLatestSnapshotId(testLocalSchema); + long v1EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot1, testLocalSchema); + Thread.sleep(1); + assertUpdate(format("INSERT INTO hive_with_redirections.%s.nation_test VALUES(25, 'POLAND', 3, 'test 1')", testLocalSchema), 1); + long snapshot2 = getLatestSnapshotId(testLocalSchema); + long v2EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot2, testLocalSchema); + Thread.sleep(1); + assertUpdate(format("INSERT INTO hive_with_redirections.%s.nation_test VALUES(26, 'CHILE', 1, 'test 2')", testLocalSchema), 1); + long snapshot3 = getLatestSnapshotId(testLocalSchema); + long v3EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot3, testLocalSchema); + long incorrectSnapshot = 2324324333L; + Thread.sleep(1); + assertQuery(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot1), "SELECT * FROM nation"); + assertQuery(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v1EpochMillis)), "SELECT * FROM nation"); + assertQuery(format("SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot2), "VALUES(26)"); + assertQuery(format( + "SELECT count(*) FROM iceberg_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v2EpochMillis)), "VALUES(26)"); + assertQuery(format("SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot3), "VALUES(27)"); + assertQuery(format( + "SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v3EpochMillis)), "VALUES(27)"); + assertQueryFails(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, incorrectSnapshot), "Iceberg snapshot ID does not exists: " + incorrectSnapshot); + assertQueryFails( + format("SELECT * FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", testLocalSchema), + format("\\QNo version history table \"%s\".\"nation_test\" at or before 1970-01-01T00:00:00.001Z", testLocalSchema)); + assertQueryFails( + format("SELECT * FROM iceberg_with_redirections.%s.region FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", schema), + "\\QThis connector does not support versioned tables"); + } + finally { + query("DROP TABLE IF EXISTS iceberg." + testLocalSchema + ".nation_test"); + query("DROP SCHEMA IF EXISTS iceberg." + testLocalSchema); + } + } + + private long getLatestSnapshotId(String schema) + { + return (long) computeActual(format("SELECT snapshot_id FROM iceberg.%s.\"nation_test$snapshots\" ORDER BY committed_at DESC LIMIT 1", schema)) + .getOnlyValue(); + } + + private long getCommittedAtInEpochMilliSeconds(long snapshotId, String schema) + { + return ((ZonedDateTime) computeActual(format("SELECT committed_at FROM iceberg.%s.\"nation_test$snapshots\" WHERE snapshot_id=%s LIMIT 1", schema, snapshotId)).getOnlyValue()) + .toInstant().toEpochMilli(); + } + + private static String timestampLiteral(long epochMilliSeconds) + { + return DateTimeFormatter.ofPattern("'TIMESTAMP '''uuuu-MM-dd HH:mm:ss." + "S".repeat(9) + " VV''") + .format(Instant.ofEpochMilli(epochMilliSeconds).atZone(UTC)); + } }