Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
import io.trino.testing.AbstractTestQueryFramework;
import org.testng.annotations.Test;

import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -128,4 +134,65 @@ public void testShowSchemas()
showCreateIcebergWithRedirectionsSchema,
getExpectedIcebergCreateSchema("iceberg_with_redirections"));
}

@Test
public void testTimeTravelWithRedirection()
throws InterruptedException
{
String testLocalSchema = "test_schema_" + randomTableSuffix();
try {
assertUpdate("CREATE SCHEMA iceberg. " + testLocalSchema);
assertUpdate(format("CREATE TABLE iceberg.%s.nation_test AS SELECT * FROM nation", testLocalSchema), 25);
assertQuery("SELECT * FROM hive_with_redirections." + testLocalSchema + ".nation_test", "SELECT * FROM nation");
long snapshot1 = getLatestSnapshotId(testLocalSchema);
long v1EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot1, testLocalSchema);
Thread.sleep(1);
assertUpdate(format("INSERT INTO hive_with_redirections.%s.nation_test VALUES(25, 'POLAND', 3, 'test 1')", testLocalSchema), 1);
long snapshot2 = getLatestSnapshotId(testLocalSchema);
long v2EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot2, testLocalSchema);
Thread.sleep(1);
assertUpdate(format("INSERT INTO hive_with_redirections.%s.nation_test VALUES(26, 'CHILE', 1, 'test 2')", testLocalSchema), 1);
long snapshot3 = getLatestSnapshotId(testLocalSchema);
long v3EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot3, testLocalSchema);
long incorrectSnapshot = 2324324333L;
Thread.sleep(1);
assertQuery(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot1), "SELECT * FROM nation");
assertQuery(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v1EpochMillis)), "SELECT * FROM nation");
assertQuery(format("SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot2), "VALUES(26)");
assertQuery(format(
"SELECT count(*) FROM iceberg_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v2EpochMillis)), "VALUES(26)");
assertQuery(format("SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot3), "VALUES(27)");
assertQuery(format(
"SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v3EpochMillis)), "VALUES(27)");
assertQueryFails(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, incorrectSnapshot), "Iceberg snapshot ID does not exists: " + incorrectSnapshot);
assertQueryFails(
format("SELECT * FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", testLocalSchema),
format("\\QNo version history table \"%s\".\"nation_test\" at or before 1970-01-01T00:00:00.001Z", testLocalSchema));
assertQueryFails(
format("SELECT * FROM iceberg_with_redirections.%s.region FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", schema),
"\\QThis connector does not support versioned tables");
}
finally {
query("DROP TABLE IF EXISTS iceberg." + testLocalSchema + ".nation_test");
query("DROP SCHEMA IF EXISTS iceberg." + testLocalSchema);
}
}

private long getLatestSnapshotId(String schema)
{
return (long) computeActual(format("SELECT snapshot_id FROM iceberg.%s.\"nation_test$snapshots\" ORDER BY committed_at DESC LIMIT 1", schema))
.getOnlyValue();
}

private long getCommittedAtInEpochMilliSeconds(long snapshotId, String schema)
{
return ((ZonedDateTime) computeActual(format("SELECT committed_at FROM iceberg.%s.\"nation_test$snapshots\" WHERE snapshot_id=%s LIMIT 1", schema, snapshotId)).getOnlyValue())
.toInstant().toEpochMilli();
}

private static String timestampLiteral(long epochMilliSeconds)
{
return DateTimeFormatter.ofPattern("'TIMESTAMP '''uuuu-MM-dd HH:mm:ss." + "S".repeat(9) + " VV''")
.format(Instant.ofEpochMilli(epochMilliSeconds).atZone(UTC));
}
}