Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1631,9 +1631,11 @@ In this example, SYSTEM_TIME can be used as an alias for TIMESTAMP.

// In following query, timestamp string is matching with second inserted record.
SELECT * FROM ctas_nation FOR TIMESTAMP AS OF TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';
SELECT * FROM ctas_nation FOR TIMESTAMP AS OF TIMESTAMP '2023-10-17 13:29:46.822';

// Same example using SYSTEM_TIME as an alias for TIMESTAMP
SELECT * FROM ctas_nation FOR SYSTEM_TIME AS OF TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';
SELECT * FROM ctas_nation FOR SYSTEM_TIME AS OF TIMESTAMP '2023-10-17 13:29:46.822';

.. code-block:: text

Expand All @@ -1643,8 +1645,12 @@ In this example, SYSTEM_TIME can be used as an alias for TIMESTAMP.
20 | canada | 2 | comment
(2 rows)

The option following FOR TIMESTAMP AS OF can accept any expression that returns a timestamp with time zone value.
For example, `TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles'` is a constant string for the expression.
.. note::

Timestamp without timezone will be parsed and rendered in the session time zone. See `TIMESTAMP <https://prestodb.io/docs/current/language/types.html#timestamp>`_.

The option following FOR TIMESTAMP AS OF can accept any expression that returns a timestamp or timestamp with time zone value.
For example, `TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles'` and `TIMESTAMP '2023-10-17 13:29:46.822'` are both valid timestamps. The first specifies the timestamp within the timezone `America/Los_Angeles`. The second will use the timestamp based on the user's session timezone.
In the following query, the expression CURRENT_TIMESTAMP returns the current timestamp with time zone value.

.. code-block:: sql
Expand All @@ -1665,6 +1671,7 @@ In the following query, the expression CURRENT_TIMESTAMP returns the current tim
// In following query, timestamp string is matching with second inserted record.
// BEFORE clause returns first record which is less than timestamp of the second record.
SELECT * FROM ctas_nation FOR TIMESTAMP BEFORE TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';
SELECT * FROM ctas_nation FOR TIMESTAMP BEFORE TIMESTAMP '2023-10-17 13:29:46.822';

.. code-block:: text

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.SqlTimestampWithTimeZone;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.VarcharType;
Expand Down Expand Up @@ -981,6 +982,11 @@ private static long getSnapshotIdForTableVersion(Table table, ConnectorTableVers
long millisUtc = new SqlTimestampWithTimeZone((long) tableVersion.getTableVersion()).getMillisUtc();
return getSnapshotIdTimeOperator(table, millisUtc, tableVersion.getVersionOperator());
}
else if (tableVersion.getVersionExpressionType() instanceof TimestampType) {
long timestampValue = (long) tableVersion.getTableVersion();
long millisUtc = ((TimestampType) tableVersion.getVersionExpressionType()).getPrecision().toMillis(timestampValue);
return getSnapshotIdTimeOperator(table, millisUtc, tableVersion.getVersionOperator());
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported table version expression type: " + tableVersion.getVersionExpressionType());
}
if (tableVersion.getVersionType() == VersionType.VERSION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,31 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.Session;
import com.facebook.presto.Session.SessionBuilder;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;

import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.testng.Assert.assertTrue;

public class TestIcebergTableVersion
extends AbstractTestQueryFramework
Expand Down Expand Up @@ -271,6 +281,55 @@ public void testTableVersionMisc()
assertQuery("SELECT count(*) FROM " + viewName3 + " INNER JOIN " + viewName4 + " ON " + viewName3 + ".id = " + viewName4 + ".id", "VALUES 2");
}

@DataProvider(name = "timezones")
public Object[][] timezones()
{
return new Object[][] {
{"UTC", true},
{"America/Los_Angeles", true},
{"Asia/Shanghai", true},
{"UTC", false}};
}

@Test(dataProvider = "timezones")
public void testTableVersionWithTimestamp(String zoneId, boolean legacyTimestamp)
{
Session session = sessionForTimezone(zoneId, legacyTimestamp);
String tableName = schemaName + "." + "table_version_with_timestamp";
try {
assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, desc varchar) WITH(partitioning = ARRAY['id'])");
assertUpdate(session, "INSERT INTO " + tableName + " VALUES(1, 'aaa')", 1);
waitUntilAfter(System.currentTimeMillis());

long timestampMillis1 = System.currentTimeMillis();
String timestampWithoutTZ1 = getTimestampString(timestampMillis1, zoneId);
waitUntilAfter(timestampMillis1);

assertUpdate(session, "INSERT INTO " + tableName + " VALUES(2, 'bbb')", 1);
waitUntilAfter(System.currentTimeMillis());

long timestampMillis2 = System.currentTimeMillis();
String timestampWithoutTZ2 = getTimestampString(timestampMillis2, zoneId);
waitUntilAfter(timestampMillis2);

assertUpdate(session, "INSERT INTO " + tableName + " VALUES(3, 'ccc')", 1);
waitUntilAfter(System.currentTimeMillis());

long timestampMillis3 = System.currentTimeMillis();
String timestampWithoutTZ3 = getTimestampString(timestampMillis3, zoneId);

assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + timestampWithoutTZ1 + "'", "VALUES 'aaa'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + timestampWithoutTZ1 + "'", "VALUES 'aaa'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + timestampWithoutTZ2 + "'", "VALUES 'aaa', 'bbb'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + timestampWithoutTZ2 + "'", "VALUES 'aaa', 'bbb'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + timestampWithoutTZ3 + "'", "VALUES 'aaa', 'bbb', 'ccc'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + timestampWithoutTZ3 + "'", "VALUES 'aaa', 'bbb', 'ccc'");
}
finally {
assertQuerySucceeds("DROP TABLE IF EXISTS " + tableName);
}
}

@Test
public void testTableVersionErrors()
{
Expand All @@ -284,23 +343,56 @@ public void testTableVersionErrors()
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF " + tab2VersionId1 + " - " + tab2VersionId1, "Iceberg snapshot ID does not exists: 0");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF CAST (100 AS BIGINT)", "Iceberg snapshot ID does not exists: 100");

assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF id", ".* cannot be resolved");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF (SELECT CURRENT_TIMESTAMP)", ".* Constant expression cannot contain a subquery");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF NULL", "Table version AS OF/BEFORE expression cannot be NULL for .*");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP WITH TIME ZONE)", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP)", ".* Type timestamp is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS DATE)", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CURRENT_DATE", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP '2023-01-01 00:00:00.000'", ".* Type timestamp is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP)", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS DATE)", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CURRENT_DATE", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP '2023-01-01 00:00:00.000'", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");

assertQueryFails("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId1 + " ORDER BY 1", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab1\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE " + tab2VersionId1 + " - " + tab2VersionId1, "Iceberg snapshot ID does not exists: 0");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE NULL", "Table version AS OF/BEFORE expression cannot be NULL for .*");
}

private Session sessionForTimezone(String zoneId, boolean legacyTimestamp)
{
SessionBuilder sessionBuilder = Session.builder(getSession())
.setSystemProperty(LEGACY_TIMESTAMP, String.valueOf(legacyTimestamp));
if (legacyTimestamp) {
sessionBuilder.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(zoneId));
}
return sessionBuilder.build();
}

private long waitUntilAfter(long snapshotTimeMillis)
{
long currentTimeMillis = System.currentTimeMillis();
assertTrue(snapshotTimeMillis - currentTimeMillis <= 10,
format("Snapshot time %s is greater than the current time %s by more than 10ms", snapshotTimeMillis, currentTimeMillis));

while (currentTimeMillis <= snapshotTimeMillis) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of the busy loop but I know we do this elsewhere, so I won't block on this but it wastes a lot of resources especially for concurrent tests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but sometimes we do need to make sure that several consecutive actions happen in different milliseconds. I'd like to to make a detailed investigation into the time frame library later to see if there is a more suitable non busy loop waiting solution, and open for any better thoughts.

currentTimeMillis = System.currentTimeMillis();
}
return currentTimeMillis;
}

private String getTimestampString(long timeMillsUtc, String zoneId)
{
Instant instant = Instant.ofEpochMilli(timeMillsUtc);
LocalDateTime localDateTime = instant
.atZone(ZoneId.of(zoneId))
.toLocalDateTime();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
formatter = formatter.withZone(ZoneId.of(zoneId));
return localDateTime.format(formatter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.common.type.MapType;
import com.facebook.presto.common.type.RealType;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarcharType;
Expand Down Expand Up @@ -1415,9 +1416,9 @@ private Optional<TableHandle> processTableVersion(Table table, QualifiedObjectNa
}
Object evalStateExpr = evaluateConstantExpression(stateExpr, stateExprType, metadata, session, analysis.getParameters());
if (tableVersionType == TIMESTAMP) {
if (!(stateExprType instanceof TimestampWithTimeZoneType)) {
if (!(stateExprType instanceof TimestampWithTimeZoneType || stateExprType instanceof TimestampType)) {
throw new SemanticException(TYPE_MISMATCH, stateExpr,
"Type %s is invalid. Supported table version AS OF/BEFORE expression type is Timestamp with Time Zone.",
"Type %s is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.",
stateExprType.getDisplayName());
}
}
Expand Down