Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -105,6 +108,7 @@
import static io.trino.transaction.TransactionBuilder.transaction;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.time.ZoneOffset.UTC;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -3699,6 +3703,37 @@ public void testModifyingOldSnapshotIsNotPossible()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testCreateTableAsSelectFromVersionedTable()
throws Exception
{
String sourceTableName = "test_ctas_versioned_source_" + randomTableSuffix();
String snapshotVersionedSinkTableName = "test_ctas_snapshot_versioned_sink_" + randomTableSuffix();
String timestampVersionedSinkTableName = "test_ctas_timestamp_versioned_sink_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + sourceTableName + "(an_integer integer)");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertUpdate("CREATE TABLE " + sourceTableName + "(an_integer integer)");
assertUpdate("CREATE TABLE " + sourceTableName + "(an_integer INTEGER)");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an_integer integer matches the convention in place used for the other tests.
Why do you suggest uppercasing the column type ?

// Enforce having exactly one snapshot of the table at the timestamp corresponding to `afterInsert123EpochMillis`
Thread.sleep(1);
Copy link
Copy Markdown
Member

@homar homar Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of this Thread.sleep? Next one makes sense but this one I don't understand.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning behind this Thread.sleep(1) is to avoid having two snapshots at the timestamp afterInsert123EpochMillis

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a code comment.

assertUpdate("INSERT INTO " + sourceTableName + " VALUES 1, 2, 3", 3);
long afterInsert123SnapshotId = getLatestSnapshotId(sourceTableName);
long afterInsert123EpochMillis = getCommittedAtInEpochMilliseconds(sourceTableName, afterInsert123SnapshotId);
Thread.sleep(1);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 4, 5, 6", 3);
long afterInsert456SnapshotId = getLatestSnapshotId(sourceTableName);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 7, 8, 9", 3);

assertUpdate("CREATE TABLE " + snapshotVersionedSinkTableName + " AS SELECT * FROM " + sourceTableName + " FOR VERSION AS OF " + afterInsert456SnapshotId, 6);
assertUpdate("CREATE TABLE " + timestampVersionedSinkTableName + " AS SELECT * FROM " + sourceTableName + " FOR TIMESTAMP AS OF " + timestampLiteral(afterInsert123EpochMillis, 9), 3);

assertQuery("SELECT * FROM " + sourceTableName, "VALUES 1, 2, 3, 4, 5, 6, 7, 8, 9");
assertQuery("SELECT * FROM " + snapshotVersionedSinkTableName, "VALUES 1, 2, 3, 4, 5, 6");
assertQuery("SELECT * FROM " + timestampVersionedSinkTableName, "VALUES 1, 2, 3");

assertUpdate("DROP TABLE " + sourceTableName);
assertUpdate("DROP TABLE " + snapshotVersionedSinkTableName);
assertUpdate("DROP TABLE " + timestampVersionedSinkTableName);
}

private Session prepareCleanUpSession()
{
return Session.builder(getSession())
Expand Down Expand Up @@ -3761,4 +3796,16 @@ private Path getIcebergTablePath(String tableName, String suffix)
String schema = getSession().getSchema().orElseThrow();
return getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data").resolve(schema).resolve(tableName).resolve(suffix);
}

private long getCommittedAtInEpochMilliseconds(String tableName, long snapshotId)
{
return ((ZonedDateTime) computeActual(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id=%s LIMIT 1", tableName, snapshotId)).getOnlyValue())
.toInstant().toEpochMilli();
}

private static String timestampLiteral(long epochMilliSeconds, int precision)
{
return DateTimeFormatter.ofPattern("'TIMESTAMP '''uuuu-MM-dd HH:mm:ss." + "S".repeat(precision) + " VV''")
.format(Instant.ofEpochMilli(epochMilliSeconds).atZone(UTC));
}
}